diff options
| author | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 23:02:46 +0000 |
|---|---|---|
| committer | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 23:02:46 +0000 |
| commit | c10d31cbbbed7b2997816cb9d296c679073b8aa5 (patch) | |
| tree | deb4f80768418007e4fa871d231f4d5620905d9b /dotnet/Qpid.Client/Client | |
| parent | 29abd268f7005cf0a952d9c77cf1bbca28d49f28 (diff) | |
| download | qpid-python-c10d31cbbbed7b2997816cb9d296c679073b8aa5.tar.gz | |
Merged revisions 537015-537026 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r537015 | tomasr | 2007-05-10 17:16:49 -0500 (Thu, 10 May 2007) | 1 line
QPID-435: Fix HeadersExchangeTest
........
r537019 | tomasr | 2007-05-10 17:25:01 -0500 (Thu, 10 May 2007) | 1 line
QPID-441 Fix handling of bounced messages
........
r537026 | tomasr | 2007-05-10 17:46:46 -0500 (Thu, 10 May 2007) | 1 line
QPID-398 SSL support for .NET client
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@537031 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
23 files changed, 1047 insertions, 232 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index a59670ef5a..da8498f938 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -672,9 +672,9 @@ namespace Qpid.Client } } - public bool AttemptReconnection(String host, int port, bool useSSL) + public bool AttemptReconnection(String host, int port, SslOptions sslConfig) { - IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, useSSL); + IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig); _failoverPolicy.setBroker(bd); @@ -708,10 +708,10 @@ namespace Qpid.Client _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); */ - _transport = new BlockingSocketTransport(brokerDetail.Host, brokerDetail.Port, this); + _transport = new BlockingSocketTransport(); // Connect. - _transport.Open(); + _transport.Connect(brokerDetail, this); _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); _protocolListener.ProtocolSession = _protocolSession; diff --git a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs new file mode 100644 index 0000000000..ec5944bdac --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs @@ -0,0 +1,45 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Qpid.Common;
+using Qpid.Protocol;
+
+namespace Qpid.Client
+{
+ [Serializable]
+ public class AMQNoConsumersException : AMQUndeliveredException
+ {
+ public AMQNoConsumersException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoConsumersException(string message, object bounced)
+ : base(AMQConstant.NO_CONSUMERS.Code, message, bounced)
+ {
+ }
+ protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs new file mode 100644 index 0000000000..8f0db1c3d5 --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs @@ -0,0 +1,46 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Qpid.Common;
+using Qpid.Protocol;
+
+namespace Qpid.Client
+{
+ [Serializable]
+ public class AMQNoRouteException : AMQUndeliveredException
+ {
+ public AMQNoRouteException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoRouteException(string message, object bounced)
+ : base(AMQConstant.NO_ROUTE.Code, message, bounced)
+ {
+ }
+
+ protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs index f26756ccad..9ae1a49473 100644 --- a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs +++ b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs @@ -36,6 +36,7 @@ namespace Qpid.Client private int _port = 5672; private string _transport = "amqp"; private Hashtable _options = new Hashtable(); + private SslOptions _sslOptions; public AmqBrokerInfo() { @@ -182,6 +183,21 @@ namespace Qpid.Client } } + public AmqBrokerInfo(string transport, string host, int port, SslOptions sslConfig) + : this() + { + _transport = transport; + _host = host; + _port = port; + + if ( sslConfig != null ) + { + SetOption(BrokerInfoConstants.OPTIONS_SSL, "true"); + _sslOptions = sslConfig; + } + } + + public string Host { get { return _host; } @@ -200,6 +216,11 @@ namespace Qpid.Client set { _transport = value; } } + public SslOptions SslOptions + { + get { return _sslOptions; } + } + public string GetOption(string key) { return (string)_options[key]; diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 07650c170b..3471ac3640 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -28,6 +28,7 @@ using Qpid.Client.Message; using Qpid.Collections; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Protocol; namespace Qpid.Client { @@ -568,8 +569,14 @@ namespace Qpid.Client if (_logger.IsDebugEnabled) { _logger.Debug("Message received in session with channel id " + _channelId); - } - _queue.EnqueueBlocking(message); + } + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } else + { + _queue.EnqueueBlocking(message); + } } public int DefaultPrefetch @@ -986,5 +993,42 @@ namespace Qpid.Client // FIXME: lock FailoverMutex here? _connection.ProtocolWriter.Write(ackFrame); } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage( + 0, false, message.ContentHeader, + message.Bodies + ); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + _connection.ExceptionReceived(exception); + } catch ( Exception ex ) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + + } } } diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs index aa79749b41..dbd09da49c 100644 --- a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs +++ b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs @@ -97,7 +97,8 @@ namespace Qpid.Client.Failover // if _host has value then we are performing a redirect. if (_host != null) { - failoverSucceeded = _connection.AttemptReconnection(_host, _port, false); + // todo: fix SSL support! + failoverSucceeded = _connection.AttemptReconnection(_host, _port, null); } else { diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs index 78526f906f..0bd65a1ace 100644 --- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs @@ -32,7 +32,7 @@ namespace Qpid.Client.Handler public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) { - _logger.Debug("New JmsBounce method received"); + _logger.Debug("New Basic.Return method received"); UnprocessedMessage msg = new UnprocessedMessage(); msg.DeliverBody = null; msg.BounceBody = (BasicReturnBody) evt.Method; diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs index 0ce8a393c9..7f88dd8219 100644 --- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs @@ -44,11 +44,20 @@ namespace Qpid.Client.Handler AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId); evt.ProtocolSession.WriteFrame(frame); - // HACK + if ( errorCode != AMQConstant.REPLY_SUCCESS.Code ) { - _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); - evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason)); + _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + throw new AMQNoConsumersException(reason); + if ( errorCode == AMQConstant.NO_ROUTE.Code ) + throw new AMQNoRouteException(reason); + if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code ) + throw new AMQInvalidArgumentException(reason); + if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code ) + throw new AMQInvalidRoutingKeyException(reason); + // any other + throw new AMQChannelClosedException(errorCode, "Error: " + reason); } evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason); } diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs index 42169f31b3..0ca443e3bb 100644 --- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -271,7 +271,7 @@ namespace Qpid.Client.Protocol id = _queueId++; } - return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id; + return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; } } } diff --git a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs index 78f13c9f42..944d21ad92 100644 --- a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs +++ b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs @@ -92,6 +92,8 @@ namespace Qpid.Client.Security if ( _mechanism2HandlerMap == null )
_mechanism2HandlerMap = new Hashtable();
+ if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
_mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
diff --git a/dotnet/Qpid.Client/Client/SslOptions.cs b/dotnet/Qpid.Client/Client/SslOptions.cs new file mode 100644 index 0000000000..a6488d99ea --- /dev/null +++ b/dotnet/Qpid.Client/Client/SslOptions.cs @@ -0,0 +1,81 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Security.Cryptography.X509Certificates;
+
+namespace Qpid.Client
+{
+ /// <summary>
+ /// Configures SSL-related options to connect to an AMQP broker.
+ /// </summary>
+ /// <remarks>
+ /// If the server certificate is not trusted by the client,
+ /// connection will fail. However, you can set the
+ /// <see cref="IgnoreValidationErrors"/> property to true
+ /// to ignore any certificate verification errors for debugging purposes.
+ /// </remarks>
+ public class SslOptions
+ {
+ private X509Certificate _clientCertificate;
+ private bool _ignoreValidationErrors;
+
+ /// <summary>
+ /// Certificate to present to the broker to authenticate
+ /// this client connection
+ /// </summary>
+ public X509Certificate ClientCertificate
+ {
+ get { return _clientCertificate; }
+ }
+
+ /// <summary>
+ /// If true, the validity of the broker certificate
+ /// will not be verified on connection
+ /// </summary>
+ public bool IgnoreValidationErrors
+ {
+ get { return _ignoreValidationErrors; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance with default values
+ /// (No client certificate, don't ignore validation errors)
+ /// </summary>
+ public SslOptions()
+ {
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="clientCertificate">
+ /// Certificate to use to authenticate the client to the broker
+ /// </param>
+ /// <param name="ignoreValidationErrors">
+ /// If true, ignore any validation errors when validating the server certificate
+ /// </param>
+ public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors)
+ {
+ _clientCertificate = clientCertificate;
+ _ignoreValidationErrors = ignoreValidationErrors;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs index d3add546fe..ca7ffad8b3 100644 --- a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs @@ -50,19 +50,18 @@ namespace Qpid.Client.Transport public Queue Read() { ByteBuffer buffer = byteChannel.Read(); + return DecodeAndTrace(buffer); + } + + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + return byteChannel.BeginRead(callback, state); + } - Queue frames = Decode(buffer); - - // TODO: Refactor to decorator. - if (_protocolTraceLog.IsDebugEnabled) - { - foreach (object o in frames) - { - _protocolTraceLog.Debug(String.Format("READ {0}", o)); - } - } - - return frames; + public Queue EndRead(IAsyncResult result) + { + ByteBuffer buffer = byteChannel.EndRead(result); + return DecodeAndTrace(buffer); } public void Write(IDataBlock o) @@ -72,10 +71,33 @@ namespace Qpid.Client.Transport { _protocolTraceLog.Debug(String.Format("WRITE {0}", o)); } - + // we should be doing an async write, but apparently + // the mentalis library doesn't queue async read/writes + // correctly and throws random IOException's. Stay sync for a while + //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); byteChannel.Write(Encode(o)); } + private void OnAsyncWriteDone(IAsyncResult result) + { + byteChannel.EndWrite(result); + } + + private Queue DecodeAndTrace(ByteBuffer buffer) + { + Queue frames = Decode(buffer); + + // TODO: Refactor to decorator. + if ( _protocolTraceLog.IsDebugEnabled ) + { + foreach ( object o in frames ) + { + _protocolTraceLog.Debug(String.Format("READ {0}", o)); + } + } + return frames; + } + private ByteBuffer Encode(object o) { SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); diff --git a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs index 4a3ff385a8..0f8f341d48 100644 --- a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs @@ -18,13 +18,54 @@ * under the License. * */ +using System; using Qpid.Buffer; namespace Qpid.Client.Transport { - public interface IByteChannel - { - ByteBuffer Read(); - void Write(ByteBuffer buffer); - } + /// <summary> + /// Represents input/output channels that read + /// and write <see cref="ByteBuffer"/> instances + /// </summary> + public interface IByteChannel + { + /// <summary> + /// Read a <see cref="ByteBuffer"/> from the underlying + /// network stream and any configured filters + /// </summary> + /// <returns>A ByteBuffer, if available</returns> + ByteBuffer Read(); + /// <summary> + /// Begin an asynchronous read operation + /// </summary> + /// <param name="callback">Callback method to call when read operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + IAsyncResult BeginRead(AsyncCallback callback, object state); + /// <summary> + /// End an asynchronous read operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param> + /// <returns>The <see cref="ByteBuffer"/> read</returns> + ByteBuffer EndRead(IAsyncResult result); + /// <summary> + /// Write a <see cref="ByteBuffer"/> to the underlying network + /// stream, going through any configured filters + /// </summary> + /// <param name="buffer"></param> + void Write(ByteBuffer buffer); + /// <summary> + /// Begin an asynchronous write operation + /// </summary> + /// <param name="buffer">Buffer to write</param> + /// <param name="callback">A callback to call when the operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state); + /// <summary> + /// End an asynchronous write operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param> + void EndWrite(IAsyncResult result); + } } diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs index 4943a45d68..0379e582d6 100644 --- a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs @@ -18,6 +18,7 @@ * under the License. * */ +using System; using System.Collections; namespace Qpid.Client.Transport @@ -25,5 +26,7 @@ namespace Qpid.Client.Transport public interface IProtocolChannel : IProtocolWriter { Queue Read(); + IAsyncResult BeginRead(AsyncCallback callback, object state); + Queue EndRead(IAsyncResult result); } } diff --git a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs new file mode 100644 index 0000000000..409b428c01 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs @@ -0,0 +1,38 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// Defines a way to introduce an arbitrary filtering
+ /// stream into the stream chain managed by <see cref="IoHandler"/>
+ /// </summary>
+ public interface IStreamFilter
+ {
+ /// <summary>
+ /// Creates a new filtering stream on top of another
+ /// </summary>
+ /// <param name="lowerStream">Next stream on the stack</param>
+ /// <returns>A new filtering stream</returns>
+ Stream CreateFilterStream(Stream lowerStream);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/dotnet/Qpid.Client/Client/Transport/ITransport.cs index aebe58b439..3d918693bc 100644 --- a/dotnet/Qpid.Client/Client/Transport/ITransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/ITransport.cs @@ -18,14 +18,15 @@ * under the License. * */ +using Qpid.Client.Qms; using Qpid.Client.Protocol; namespace Qpid.Client.Transport { public interface ITransport : IConnectionCloser { - void Open(); - string getLocalEndPoint(); + void Connect(IBrokerInfo broker, AMQConnection connection); + string LocalEndpoint { get; } IProtocolWriter ProtocolWriter { get; } } } diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs new file mode 100644 index 0000000000..8d1f04f662 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs @@ -0,0 +1,321 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Threading;
+using log4net;
+using Qpid.Buffer;
+using Qpid.Client.Protocol;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// Responsible for reading and writing
+ /// ByteBuffers from/to network streams, and handling
+ /// the stream filters
+ /// </summary>
+ public class IoHandler : IByteChannel, IDisposable
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
+ private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private Stream _topStream;
+ private IProtocolListener _protocolListener;
+ private int _readBufferSize;
+
+ public int ReadBufferSize
+ {
+ get { return _readBufferSize; }
+ set { _readBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="stream">Underlying network stream</param>
+ /// <param name="protocolListener">Protocol listener to report exceptions to</param>
+ public IoHandler(Stream stream, IProtocolListener protocolListener)
+ {
+ if ( stream == null )
+ throw new ArgumentNullException("stream");
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ // initially, the stream at the top of the filter
+ // chain is the underlying network stream
+ _topStream = stream;
+ _protocolListener = protocolListener;
+ _readBufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ /// <summary>
+ /// Adds a new filter on the top of the chain
+ /// </summary>
+ /// <param name="filter">Stream filter to put on top of the chain</param>
+ /// <remarks>
+ /// This should *only* be called during initialization. We don't
+ /// support changing the filter change after the first read/write
+ /// has been done and it's not thread-safe to boot!
+ /// </remarks>
+ public void AddFilter(IStreamFilter filter)
+ {
+ _topStream = filter.CreateFilterStream(_topStream);
+ }
+
+ #region IByteChannel Implementation
+ //
+ // IByteChannel Implementation
+ //
+
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ public ByteBuffer Read()
+ {
+ byte[] bytes = AllocateBuffer();
+
+ int numOctets = _topStream.Read(bytes, 0, bytes.Length);
+
+ return WrapByteArray(bytes, numOctets);
+ }
+
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ byte[] bytes = AllocateBuffer();
+ ReadData rd = new ReadData(callback, state, bytes);
+
+ // only put a callback if the caller wants one.
+ AsyncCallback myCallback = null;
+ if ( callback != null )
+ myCallback = new AsyncCallback(OnAsyncReadDone);
+
+ IAsyncResult result = _topStream.BeginRead(
+ bytes, 0, bytes.Length, myCallback,rd
+ );
+ return new WrappedAsyncResult(result, bytes);
+ }
+
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ WrappedAsyncResult theResult = (WrappedAsyncResult)result;
+ int bytesRead = _topStream.EndRead(theResult.InnerResult);
+ return WrapByteArray(theResult.Buffer, bytesRead);
+ }
+
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ public void Write(ByteBuffer buffer)
+ {
+ try
+ {
+ _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
+ } catch ( Exception e )
+ {
+ _log.Error("Write caused exception", e);
+ _protocolListener.OnException(e);
+ }
+ }
+
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ try
+ {
+ return _topStream.BeginWrite(
+ buffer.Array, buffer.Position, buffer.Limit,
+ callback, state
+ );
+ } catch ( Exception e )
+ {
+ _log.Error("BeginWrite caused exception", e);
+ // not clear if an exception here should be propagated? we still
+ // need to propagate it upwards anyway!
+ _protocolListener.OnException(e);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ public void EndWrite(IAsyncResult result)
+ {
+ try
+ {
+ _topStream.EndWrite(result);
+ } catch ( Exception e )
+ {
+ _log.Error("EndWrite caused exception", e);
+ // not clear if an exception here should be propagated?
+ _protocolListener.OnException(e);
+ //throw;
+ }
+ }
+ #endregion // IByteChannel Implementation
+
+ #region IDisposable Implementation
+ //
+ // IDisposable Implementation
+ //
+
+ public void Dispose()
+ {
+ if ( _topStream != null )
+ {
+ _topStream.Close();
+ }
+ }
+
+ #endregion // IDisposable Implementation
+
+ #region Private and Helper Classes/Methods
+ //
+ // Private and Helper Classes/Methods
+ //
+
+ private byte[] AllocateBuffer()
+ {
+ return new byte[ReadBufferSize];
+ }
+
+ private static ByteBuffer WrapByteArray(byte[] bytes, int size)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
+ byteBuffer.Limit = size;
+ byteBuffer.Flip();
+
+ return byteBuffer;
+ }
+
+
+ private static void OnAsyncReadDone(IAsyncResult result)
+ {
+ ReadData rd = (ReadData) result.AsyncState;
+ IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
+ rd.Callback(wrapped);
+ }
+
+ class ReadData
+ {
+ private object _state;
+ private AsyncCallback _callback;
+ private byte[] _buffer;
+
+ public object State
+ {
+ get { return _state; }
+ }
+
+ public AsyncCallback Callback
+ {
+ get { return _callback; }
+ }
+
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public ReadData(AsyncCallback callback, object state, byte[] buffer)
+ {
+ _callback = callback;
+ _state = state;
+ _buffer = buffer;
+ }
+ }
+
+ class WrappedAsyncResult : IAsyncResult
+ {
+ private IAsyncResult _innerResult;
+ private byte[] _buffer;
+
+ #region IAsyncResult Properties
+ //
+ // IAsyncResult Properties
+ //
+ public bool IsCompleted
+ {
+ get { return _innerResult.IsCompleted; }
+ }
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _innerResult.AsyncWaitHandle; }
+ }
+
+ public object AsyncState
+ {
+ get { return _innerResult.AsyncState; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return _innerResult.CompletedSynchronously; }
+ }
+ #endregion // IAsyncResult Properties
+
+ public IAsyncResult InnerResult
+ {
+ get { return _innerResult; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
+ {
+ if ( result == null )
+ throw new ArgumentNullException("result");
+ if ( buffer == null )
+ throw new ArgumentNullException("buffer");
+
+ _innerResult = result;
+ _buffer = buffer;
+ }
+ }
+
+ #endregion // Private and Helper Classes/Methods
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs index 7a9ead0c06..e69de29bb2 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs @@ -1,117 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Net;
-using System.Net.Sockets;
-using log4net;
-using Qpid.Buffer;
-using Qpid.Client.Protocol;
-
-namespace Qpid.Client.Transport.Socket.Blocking
-{
- class BlockingSocketProcessor : IConnectionCloser
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketProcessor));
-
- string _host;
- int _port;
- System.Net.Sockets.Socket _socket;
- private NetworkStream _networkStream;
- IByteChannel _byteChannel;
- IProtocolListener _protocolListener;
-
- public BlockingSocketProcessor(string host, int port, IProtocolListener protocolListener)
- {
- _host = host;
- _port = port;
- _protocolListener = protocolListener;
- _byteChannel = new ByteChannel(this);
- }
-
- /// <summary>
- /// Synchronous blocking connect.
- /// </summary>
- public void Connect()
- {
- _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
-
- IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
- IPAddress ipAddress = ipHostInfo.AddressList[0];
-
- IPEndPoint ipe = new IPEndPoint(ipAddress, _port);
-
- _socket.Connect(ipe);
- _networkStream = new NetworkStream(_socket, true);
- }
-
- public string getLocalEndPoint()
- {
- return _socket.LocalEndPoint.ToString();
- }
-
- public void Write(ByteBuffer byteBuffer)
- {
- try
- {
- _networkStream.Write(byteBuffer.Array, byteBuffer.Position, byteBuffer.Limit); // FIXME
- }
- catch (Exception e)
- {
- _log.Error("Write caused exception", e);
- _protocolListener.OnException(e);
- }
- }
-
- public ByteBuffer Read()
- {
- const int bufferSize = 4 * 1024; // TODO: Prevent constant allocation of buffers.
- byte[] bytes = new byte[bufferSize];
-
- int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
-
- ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
- byteBuffer.Limit = numOctets;
-
- byteBuffer.Flip();
-
- return byteBuffer;
- }
-
- public void Disconnect()
- {
- _networkStream.Flush();
- _networkStream.Close();
- _socket.Close();
- }
-
- public void Close()
- {
- Disconnect();
- }
-
- public IByteChannel ByteChannel
- {
- get { return _byteChannel; }
- }
- }
-}
-
-
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs index e18eefd493..607b7ca422 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -20,101 +20,119 @@ */
using System;
using System.Collections;
+using System.IO;
using System.Threading;
-using log4net;
+using Qpid.Client.Qms;
using Qpid.Client.Protocol;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
{
- public class BlockingSocketTransport : ITransport
- {
-// static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketTransport));
+ /// <summary>
+ /// TCP Socket transport supporting both
+ /// SSL and non-SSL connections.
+ /// </summary>
+ public class BlockingSocketTransport : ITransport
+ {
+ // Configuration variables.
+ IProtocolListener _protocolListener;
- // Configuration variables.
- string _host;
- int _port;
- IProtocolListener _protocolListener;
+ // Runtime variables.
+ private ISocketConnector _connector;
+ private IoHandler _ioHandler;
+ private AmqpChannel _amqpChannel;
+ private ManualResetEvent _stopEvent;
- // Runtime variables.
- private BlockingSocketProcessor _socketProcessor;
- private AmqpChannel _amqpChannel;
-
- private ReaderRunner _readerRunner;
- private Thread _readerThread;
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _amqpChannel; }
+ }
+ public string LocalEndpoint
+ {
+ get { return _connector.LocalEndpoint; }
+ }
- public BlockingSocketTransport(string host, int port, AMQConnection connection)
- {
- _host = host;
- _port = port;
- _protocolListener = connection.ProtocolListener;
- }
-
- public void Open()
- {
- _socketProcessor = new BlockingSocketProcessor(_host, _port, _protocolListener);
- _socketProcessor.Connect();
- _amqpChannel = new AmqpChannel(_socketProcessor.ByteChannel);
- _readerRunner = new ReaderRunner(this);
- _readerThread = new Thread(new ThreadStart(_readerRunner.Run));
- _readerThread.Start();
- }
+
+ /// <summary>
+ /// Connect to the specified broker
+ /// </summary>
+ /// <param name="broker">The broker to connect to</param>
+ /// <param name="connection">The AMQ connection</param>
+ public void Connect(IBrokerInfo broker, AMQConnection connection)
+ {
+ _stopEvent = new ManualResetEvent(false);
+ _protocolListener = connection.ProtocolListener;
- public string getLocalEndPoint()
- {
- return _socketProcessor.getLocalEndPoint();
- }
+ _ioHandler = MakeBrokerConnection(broker, connection);
+ // todo: get default read size from config!
- public void Close()
- {
- StopReaderThread();
- _socketProcessor.Disconnect();
- }
+ _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ // post an initial async read
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
+ }
- public IProtocolChannel ProtocolChannel { get { return _amqpChannel; } }
- public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } }
+ /// <summary>
+ /// Close the broker connection
+ /// </summary>
+ public void Close()
+ {
+ StopReading();
+ CloseBrokerConnection();
+ }
- public void StopReaderThread()
- {
- _readerRunner.Stop();
- }
+ private void StopReading()
+ {
+ _stopEvent.Set();
+ }
- class ReaderRunner
- {
- BlockingSocketTransport _transport;
- bool _running = true;
+ private void CloseBrokerConnection()
+ {
+ if ( _ioHandler != null )
+ {
+ _ioHandler.Dispose();
+ _ioHandler = null;
+ }
+ if ( _connector != null )
+ {
+ _connector.Dispose();
+ _connector = null;
+ }
+ }
- public ReaderRunner(BlockingSocketTransport transport)
- {
- _transport = transport;
- }
+ private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
+ {
+ if ( broker.UseSSL )
+ {
+ _connector = new SslSocketConnector();
+ } else
+ {
+ _connector = new SocketConnector();
+ }
- public void Run()
- {
- try
- {
- while (_running)
- {
- Queue frames = _transport.ProtocolChannel.Read();
+ Stream stream = _connector.Connect(broker);
+ return new IoHandler(stream, connection.ProtocolListener);
+ }
- foreach (IDataBlock dataBlock in frames)
- {
- _transport._protocolListener.OnMessage(dataBlock);
- }
- }
- }
- catch (Exception e)
- {
- _transport._protocolListener.OnException(e);
- }
- }
+ private void OnAsyncReadDone(IAsyncResult result)
+ {
+ try
+ {
+ Queue frames = _amqpChannel.EndRead(result);
+ // if we're not stopping, post a read again
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
- public void Stop()
+ // process results
+ foreach ( IDataBlock dataBlock in frames )
{
- // TODO: Check if this is thread safe. running is not volitile....
- _running = false;
+ _protocolListener.OnMessage(dataBlock);
}
- }
- }
+ } catch ( Exception e )
+ {
+ _protocolListener.OnException(e);
+ }
+ }
+ }
}
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs index a520815f84..ff2c301a91 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs @@ -29,16 +29,16 @@ namespace Qpid.Client.Transport.Socket.Blocking // Warning: don't use this log for regular logging.
private static readonly ILog _ioTraceLog = LogManager.GetLogger("Qpid.Client.ByteChannel.Tracing");
- BlockingSocketProcessor processor;
+ private IByteChannel _lowerChannel;
- public ByteChannel(BlockingSocketProcessor processor)
+ public ByteChannel(IByteChannel lowerChannel)
{
- this.processor = processor;
+ _lowerChannel = lowerChannel;
}
public ByteBuffer Read()
{
- ByteBuffer result = processor.Read();
+ ByteBuffer result = _lowerChannel.Read();
// TODO: Move into decorator.
if (_ioTraceLog.IsDebugEnabled)
@@ -49,6 +49,21 @@ namespace Qpid.Client.Transport.Socket.Blocking return result;
}
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _lowerChannel.BeginRead(callback, state);
+ }
+
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _lowerChannel.EndRead(result);
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", buffer));
+ }
+ return buffer;
+ }
+
public void Write(ByteBuffer buffer)
{
// TODO: Move into decorator.
@@ -56,8 +71,22 @@ namespace Qpid.Client.Transport.Socket.Blocking {
_ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
}
-
- processor.Write(buffer);
+
+ _lowerChannel.Write(buffer);
+ }
+
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ return _lowerChannel.BeginWrite(buffer, callback, state);
+ }
+
+ public void EndWrite(IAsyncResult result)
+ {
+ _lowerChannel.EndWrite(result);
}
- }
+ }
}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs new file mode 100644 index 0000000000..ac0dc37a16 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs @@ -0,0 +1,33 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ interface ISocketConnector : IDisposable
+ {
+ string LocalEndpoint { get; }
+ Stream Connect(IBrokerInfo broker);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs new file mode 100644 index 0000000000..a651413581 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs @@ -0,0 +1,70 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TCP connection over regular sockets.
+ /// </summary>
+ class SocketConnector : ISocketConnector
+ {
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ class MyTcpClient : TcpClient
+ {
+ public MyTcpClient(string host, int port)
+ : base(host, port)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+ }
+
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs new file mode 100644 index 0000000000..24c3f5bcb8 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs @@ -0,0 +1,107 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using log4net;
+using Qpid.Client.Qms;
+using Org.Mentalis.Security.Ssl;
+using MCertificate = Org.Mentalis.Security.Certificates.Certificate;
+using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TLS v1.0 connection using the Mentalis.org library
+ /// </summary>
+ /// <remarks>
+ /// It would've been easier to implement this at the StreamFilter
+ /// level, but unfortunately the Mentalis library doesn't support
+ /// a passthrough SSL stream class and is tied directly
+ /// to socket-like classes.
+ /// </remarks>
+ class SslSocketConnector : ISocketConnector
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector));
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ MCertificate cert = GetClientCert(broker);
+ SecurityOptions options = new SecurityOptions(
+ SecureProtocol.Tls1, cert, ConnectionEnd.Client
+ );
+ if ( broker.SslOptions != null
+ && broker.SslOptions.IgnoreValidationErrors )
+ {
+ _logger.Warn("Ignoring any certificate validation errors during SSL handshake...");
+ options.VerificationType = CredentialVerification.None;
+ }
+
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port, options);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ private static MCertificate GetClientCert(IBrokerInfo broker)
+ {
+ // if a client certificate is configured,
+ // use that to enable mutual authentication
+ MCertificate cert = null;
+ if ( broker.SslOptions != null
+ && broker.SslOptions.ClientCertificate != null )
+ {
+ cert = MCertificate.CreateFromX509Certificate(
+ broker.SslOptions.ClientCertificate
+ );
+ _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true));
+ }
+ return cert;
+ }
+
+ class MyTcpClient : SecureTcpClient
+ {
+ public MyTcpClient(string host, int port, SecurityOptions options)
+ : base(host, port, options)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+
+ }
+
+ }
+}
|
