summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client
diff options
context:
space:
mode:
authorTomas Restrepo <tomasr@apache.org>2007-05-10 23:02:46 +0000
committerTomas Restrepo <tomasr@apache.org>2007-05-10 23:02:46 +0000
commitc10d31cbbbed7b2997816cb9d296c679073b8aa5 (patch)
treedeb4f80768418007e4fa871d231f4d5620905d9b /dotnet/Qpid.Client/Client
parent29abd268f7005cf0a952d9c77cf1bbca28d49f28 (diff)
downloadqpid-python-c10d31cbbbed7b2997816cb9d296c679073b8aa5.tar.gz
Merged revisions 537015-537026 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r537015 | tomasr | 2007-05-10 17:16:49 -0500 (Thu, 10 May 2007) | 1 line QPID-435: Fix HeadersExchangeTest ........ r537019 | tomasr | 2007-05-10 17:25:01 -0500 (Thu, 10 May 2007) | 1 line QPID-441 Fix handling of bounced messages ........ r537026 | tomasr | 2007-05-10 17:46:46 -0500 (Thu, 10 May 2007) | 1 line QPID-398 SSL support for .NET client ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@537031 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs8
-rw-r--r--dotnet/Qpid.Client/Client/AMQNoConsumersException.cs45
-rw-r--r--dotnet/Qpid.Client/Client/AMQNoRouteException.cs46
-rw-r--r--dotnet/Qpid.Client/Client/AmqBrokerInfo.cs21
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs48
-rw-r--r--dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs3
-rw-r--r--dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs15
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs2
-rw-r--r--dotnet/Qpid.Client/Client/SslOptions.cs81
-rw-r--r--dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs48
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IByteChannel.cs51
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs3
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs38
-rw-r--r--dotnet/Qpid.Client/Client/Transport/ITransport.cs5
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IoHandler.cs321
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs117
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs170
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs43
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs33
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs70
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs107
23 files changed, 1047 insertions, 232 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
index a59670ef5a..da8498f938 100644
--- a/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -672,9 +672,9 @@ namespace Qpid.Client
}
}
- public bool AttemptReconnection(String host, int port, bool useSSL)
+ public bool AttemptReconnection(String host, int port, SslOptions sslConfig)
{
- IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, useSSL);
+ IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig);
_failoverPolicy.setBroker(bd);
@@ -708,10 +708,10 @@ namespace Qpid.Client
_transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType);
*/
- _transport = new BlockingSocketTransport(brokerDetail.Host, brokerDetail.Port, this);
+ _transport = new BlockingSocketTransport();
// Connect.
- _transport.Open();
+ _transport.Connect(brokerDetail, this);
_protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener);
_protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this);
_protocolListener.ProtocolSession = _protocolSession;
diff --git a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs
new file mode 100644
index 0000000000..ec5944bdac
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Qpid.Common;
+using Qpid.Protocol;
+
+namespace Qpid.Client
+{
+ [Serializable]
+ public class AMQNoConsumersException : AMQUndeliveredException
+ {
+ public AMQNoConsumersException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoConsumersException(string message, object bounced)
+ : base(AMQConstant.NO_CONSUMERS.Code, message, bounced)
+ {
+ }
+ protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs
new file mode 100644
index 0000000000..8f0db1c3d5
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Qpid.Common;
+using Qpid.Protocol;
+
+namespace Qpid.Client
+{
+ [Serializable]
+ public class AMQNoRouteException : AMQUndeliveredException
+ {
+ public AMQNoRouteException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoRouteException(string message, object bounced)
+ : base(AMQConstant.NO_ROUTE.Code, message, bounced)
+ {
+ }
+
+ protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
index f26756ccad..9ae1a49473 100644
--- a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
+++ b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
@@ -36,6 +36,7 @@ namespace Qpid.Client
private int _port = 5672;
private string _transport = "amqp";
private Hashtable _options = new Hashtable();
+ private SslOptions _sslOptions;
public AmqBrokerInfo()
{
@@ -182,6 +183,21 @@ namespace Qpid.Client
}
}
+ public AmqBrokerInfo(string transport, string host, int port, SslOptions sslConfig)
+ : this()
+ {
+ _transport = transport;
+ _host = host;
+ _port = port;
+
+ if ( sslConfig != null )
+ {
+ SetOption(BrokerInfoConstants.OPTIONS_SSL, "true");
+ _sslOptions = sslConfig;
+ }
+ }
+
+
public string Host
{
get { return _host; }
@@ -200,6 +216,11 @@ namespace Qpid.Client
set { _transport = value; }
}
+ public SslOptions SslOptions
+ {
+ get { return _sslOptions; }
+ }
+
public string GetOption(string key)
{
return (string)_options[key];
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 07650c170b..3471ac3640 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -28,6 +28,7 @@ using Qpid.Client.Message;
using Qpid.Collections;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Protocol;
namespace Qpid.Client
{
@@ -568,8 +569,14 @@ namespace Qpid.Client
if (_logger.IsDebugEnabled)
{
_logger.Debug("Message received in session with channel id " + _channelId);
- }
- _queue.EnqueueBlocking(message);
+ }
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ } else
+ {
+ _queue.EnqueueBlocking(message);
+ }
}
public int DefaultPrefetch
@@ -986,5 +993,42 @@ namespace Qpid.Client
// FIXME: lock FailoverMutex here?
_connection.ProtocolWriter.Write(ackFrame);
}
+
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(
+ 0, false, message.ContentHeader,
+ message.Bodies
+ );
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQException exception;
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ {
+ exception = new AMQNoConsumersException(reason, bouncedMessage);
+ } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ } else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+ }
+ _connection.ExceptionReceived(exception);
+ } catch ( Exception ex )
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
+ }
+
+ }
}
}
diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
index aa79749b41..dbd09da49c 100644
--- a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
+++ b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
@@ -97,7 +97,8 @@ namespace Qpid.Client.Failover
// if _host has value then we are performing a redirect.
if (_host != null)
{
- failoverSucceeded = _connection.AttemptReconnection(_host, _port, false);
+ // todo: fix SSL support!
+ failoverSucceeded = _connection.AttemptReconnection(_host, _port, null);
}
else
{
diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
index 78526f906f..0bd65a1ace 100644
--- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
@@ -32,7 +32,7 @@ namespace Qpid.Client.Handler
public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
{
- _logger.Debug("New JmsBounce method received");
+ _logger.Debug("New Basic.Return method received");
UnprocessedMessage msg = new UnprocessedMessage();
msg.DeliverBody = null;
msg.BounceBody = (BasicReturnBody) evt.Method;
diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
index 0ce8a393c9..7f88dd8219 100644
--- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
@@ -44,11 +44,20 @@ namespace Qpid.Client.Handler
AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
evt.ProtocolSession.WriteFrame(frame);
- // HACK
+
if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
{
- _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
- evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason));
+ _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ throw new AMQNoConsumersException(reason);
+ if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ throw new AMQNoRouteException(reason);
+ if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+ throw new AMQInvalidArgumentException(reason);
+ if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+ throw new AMQInvalidRoutingKeyException(reason);
+ // any other
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
}
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
index 42169f31b3..0ca443e3bb 100644
--- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
@@ -271,7 +271,7 @@ namespace Qpid.Client.Protocol
id = _queueId++;
}
- return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id;
+ return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;
}
}
}
diff --git a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
index 78f13c9f42..944d21ad92 100644
--- a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
+++ b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
@@ -92,6 +92,8 @@ namespace Qpid.Client.Security
if ( _mechanism2HandlerMap == null )
_mechanism2HandlerMap = new Hashtable();
+ if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
_mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
diff --git a/dotnet/Qpid.Client/Client/SslOptions.cs b/dotnet/Qpid.Client/Client/SslOptions.cs
new file mode 100644
index 0000000000..a6488d99ea
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/SslOptions.cs
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Security.Cryptography.X509Certificates;
+
+namespace Qpid.Client
+{
+ /// <summary>
+ /// Configures SSL-related options to connect to an AMQP broker.
+ /// </summary>
+ /// <remarks>
+ /// If the server certificate is not trusted by the client,
+ /// connection will fail. However, you can set the
+ /// <see cref="IgnoreValidationErrors"/> property to true
+ /// to ignore any certificate verification errors for debugging purposes.
+ /// </remarks>
+ public class SslOptions
+ {
+ private X509Certificate _clientCertificate;
+ private bool _ignoreValidationErrors;
+
+ /// <summary>
+ /// Certificate to present to the broker to authenticate
+ /// this client connection
+ /// </summary>
+ public X509Certificate ClientCertificate
+ {
+ get { return _clientCertificate; }
+ }
+
+ /// <summary>
+ /// If true, the validity of the broker certificate
+ /// will not be verified on connection
+ /// </summary>
+ public bool IgnoreValidationErrors
+ {
+ get { return _ignoreValidationErrors; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance with default values
+ /// (No client certificate, don't ignore validation errors)
+ /// </summary>
+ public SslOptions()
+ {
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="clientCertificate">
+ /// Certificate to use to authenticate the client to the broker
+ /// </param>
+ /// <param name="ignoreValidationErrors">
+ /// If true, ignore any validation errors when validating the server certificate
+ /// </param>
+ public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors)
+ {
+ _clientCertificate = clientCertificate;
+ _ignoreValidationErrors = ignoreValidationErrors;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
index d3add546fe..ca7ffad8b3 100644
--- a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
+++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
@@ -50,19 +50,18 @@ namespace Qpid.Client.Transport
public Queue Read()
{
ByteBuffer buffer = byteChannel.Read();
+ return DecodeAndTrace(buffer);
+ }
+
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return byteChannel.BeginRead(callback, state);
+ }
- Queue frames = Decode(buffer);
-
- // TODO: Refactor to decorator.
- if (_protocolTraceLog.IsDebugEnabled)
- {
- foreach (object o in frames)
- {
- _protocolTraceLog.Debug(String.Format("READ {0}", o));
- }
- }
-
- return frames;
+ public Queue EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = byteChannel.EndRead(result);
+ return DecodeAndTrace(buffer);
}
public void Write(IDataBlock o)
@@ -72,10 +71,33 @@ namespace Qpid.Client.Transport
{
_protocolTraceLog.Debug(String.Format("WRITE {0}", o));
}
-
+ // we should be doing an async write, but apparently
+ // the mentalis library doesn't queue async read/writes
+ // correctly and throws random IOException's. Stay sync for a while
+ //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
byteChannel.Write(Encode(o));
}
+ private void OnAsyncWriteDone(IAsyncResult result)
+ {
+ byteChannel.EndWrite(result);
+ }
+
+ private Queue DecodeAndTrace(ByteBuffer buffer)
+ {
+ Queue frames = Decode(buffer);
+
+ // TODO: Refactor to decorator.
+ if ( _protocolTraceLog.IsDebugEnabled )
+ {
+ foreach ( object o in frames )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", o));
+ }
+ }
+ return frames;
+ }
+
private ByteBuffer Encode(object o)
{
SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
diff --git a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
index 4a3ff385a8..0f8f341d48 100644
--- a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
+++ b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
@@ -18,13 +18,54 @@
* under the License.
*
*/
+using System;
using Qpid.Buffer;
namespace Qpid.Client.Transport
{
- public interface IByteChannel
- {
- ByteBuffer Read();
- void Write(ByteBuffer buffer);
- }
+ /// <summary>
+ /// Represents input/output channels that read
+ /// and write <see cref="ByteBuffer"/> instances
+ /// </summary>
+ public interface IByteChannel
+ {
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ ByteBuffer Read();
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ IAsyncResult BeginRead(AsyncCallback callback, object state);
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ ByteBuffer EndRead(IAsyncResult result);
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ void Write(ByteBuffer buffer);
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state);
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ void EndWrite(IAsyncResult result);
+ }
}
diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
index 4943a45d68..0379e582d6 100644
--- a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
+++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
@@ -18,6 +18,7 @@
* under the License.
*
*/
+using System;
using System.Collections;
namespace Qpid.Client.Transport
@@ -25,5 +26,7 @@ namespace Qpid.Client.Transport
public interface IProtocolChannel : IProtocolWriter
{
Queue Read();
+ IAsyncResult BeginRead(AsyncCallback callback, object state);
+ Queue EndRead(IAsyncResult result);
}
}
diff --git a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs
new file mode 100644
index 0000000000..409b428c01
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// Defines a way to introduce an arbitrary filtering
+ /// stream into the stream chain managed by <see cref="IoHandler"/>
+ /// </summary>
+ public interface IStreamFilter
+ {
+ /// <summary>
+ /// Creates a new filtering stream on top of another
+ /// </summary>
+ /// <param name="lowerStream">Next stream on the stack</param>
+ /// <returns>A new filtering stream</returns>
+ Stream CreateFilterStream(Stream lowerStream);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/dotnet/Qpid.Client/Client/Transport/ITransport.cs
index aebe58b439..3d918693bc 100644
--- a/dotnet/Qpid.Client/Client/Transport/ITransport.cs
+++ b/dotnet/Qpid.Client/Client/Transport/ITransport.cs
@@ -18,14 +18,15 @@
* under the License.
*
*/
+using Qpid.Client.Qms;
using Qpid.Client.Protocol;
namespace Qpid.Client.Transport
{
public interface ITransport : IConnectionCloser
{
- void Open();
- string getLocalEndPoint();
+ void Connect(IBrokerInfo broker, AMQConnection connection);
+ string LocalEndpoint { get; }
IProtocolWriter ProtocolWriter { get; }
}
}
diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
new file mode 100644
index 0000000000..8d1f04f662
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
@@ -0,0 +1,321 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Threading;
+using log4net;
+using Qpid.Buffer;
+using Qpid.Client.Protocol;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// Responsible for reading and writing
+ /// ByteBuffers from/to network streams, and handling
+ /// the stream filters
+ /// </summary>
+ public class IoHandler : IByteChannel, IDisposable
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
+ private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private Stream _topStream;
+ private IProtocolListener _protocolListener;
+ private int _readBufferSize;
+
+ public int ReadBufferSize
+ {
+ get { return _readBufferSize; }
+ set { _readBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="stream">Underlying network stream</param>
+ /// <param name="protocolListener">Protocol listener to report exceptions to</param>
+ public IoHandler(Stream stream, IProtocolListener protocolListener)
+ {
+ if ( stream == null )
+ throw new ArgumentNullException("stream");
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ // initially, the stream at the top of the filter
+ // chain is the underlying network stream
+ _topStream = stream;
+ _protocolListener = protocolListener;
+ _readBufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ /// <summary>
+ /// Adds a new filter on the top of the chain
+ /// </summary>
+ /// <param name="filter">Stream filter to put on top of the chain</param>
+ /// <remarks>
+ /// This should *only* be called during initialization. We don't
+ /// support changing the filter change after the first read/write
+ /// has been done and it's not thread-safe to boot!
+ /// </remarks>
+ public void AddFilter(IStreamFilter filter)
+ {
+ _topStream = filter.CreateFilterStream(_topStream);
+ }
+
+ #region IByteChannel Implementation
+ //
+ // IByteChannel Implementation
+ //
+
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ public ByteBuffer Read()
+ {
+ byte[] bytes = AllocateBuffer();
+
+ int numOctets = _topStream.Read(bytes, 0, bytes.Length);
+
+ return WrapByteArray(bytes, numOctets);
+ }
+
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ byte[] bytes = AllocateBuffer();
+ ReadData rd = new ReadData(callback, state, bytes);
+
+ // only put a callback if the caller wants one.
+ AsyncCallback myCallback = null;
+ if ( callback != null )
+ myCallback = new AsyncCallback(OnAsyncReadDone);
+
+ IAsyncResult result = _topStream.BeginRead(
+ bytes, 0, bytes.Length, myCallback,rd
+ );
+ return new WrappedAsyncResult(result, bytes);
+ }
+
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ WrappedAsyncResult theResult = (WrappedAsyncResult)result;
+ int bytesRead = _topStream.EndRead(theResult.InnerResult);
+ return WrapByteArray(theResult.Buffer, bytesRead);
+ }
+
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ public void Write(ByteBuffer buffer)
+ {
+ try
+ {
+ _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
+ } catch ( Exception e )
+ {
+ _log.Error("Write caused exception", e);
+ _protocolListener.OnException(e);
+ }
+ }
+
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ try
+ {
+ return _topStream.BeginWrite(
+ buffer.Array, buffer.Position, buffer.Limit,
+ callback, state
+ );
+ } catch ( Exception e )
+ {
+ _log.Error("BeginWrite caused exception", e);
+ // not clear if an exception here should be propagated? we still
+ // need to propagate it upwards anyway!
+ _protocolListener.OnException(e);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ public void EndWrite(IAsyncResult result)
+ {
+ try
+ {
+ _topStream.EndWrite(result);
+ } catch ( Exception e )
+ {
+ _log.Error("EndWrite caused exception", e);
+ // not clear if an exception here should be propagated?
+ _protocolListener.OnException(e);
+ //throw;
+ }
+ }
+ #endregion // IByteChannel Implementation
+
+ #region IDisposable Implementation
+ //
+ // IDisposable Implementation
+ //
+
+ public void Dispose()
+ {
+ if ( _topStream != null )
+ {
+ _topStream.Close();
+ }
+ }
+
+ #endregion // IDisposable Implementation
+
+ #region Private and Helper Classes/Methods
+ //
+ // Private and Helper Classes/Methods
+ //
+
+ private byte[] AllocateBuffer()
+ {
+ return new byte[ReadBufferSize];
+ }
+
+ private static ByteBuffer WrapByteArray(byte[] bytes, int size)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
+ byteBuffer.Limit = size;
+ byteBuffer.Flip();
+
+ return byteBuffer;
+ }
+
+
+ private static void OnAsyncReadDone(IAsyncResult result)
+ {
+ ReadData rd = (ReadData) result.AsyncState;
+ IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
+ rd.Callback(wrapped);
+ }
+
+ class ReadData
+ {
+ private object _state;
+ private AsyncCallback _callback;
+ private byte[] _buffer;
+
+ public object State
+ {
+ get { return _state; }
+ }
+
+ public AsyncCallback Callback
+ {
+ get { return _callback; }
+ }
+
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public ReadData(AsyncCallback callback, object state, byte[] buffer)
+ {
+ _callback = callback;
+ _state = state;
+ _buffer = buffer;
+ }
+ }
+
+ class WrappedAsyncResult : IAsyncResult
+ {
+ private IAsyncResult _innerResult;
+ private byte[] _buffer;
+
+ #region IAsyncResult Properties
+ //
+ // IAsyncResult Properties
+ //
+ public bool IsCompleted
+ {
+ get { return _innerResult.IsCompleted; }
+ }
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _innerResult.AsyncWaitHandle; }
+ }
+
+ public object AsyncState
+ {
+ get { return _innerResult.AsyncState; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return _innerResult.CompletedSynchronously; }
+ }
+ #endregion // IAsyncResult Properties
+
+ public IAsyncResult InnerResult
+ {
+ get { return _innerResult; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
+ {
+ if ( result == null )
+ throw new ArgumentNullException("result");
+ if ( buffer == null )
+ throw new ArgumentNullException("buffer");
+
+ _innerResult = result;
+ _buffer = buffer;
+ }
+ }
+
+ #endregion // Private and Helper Classes/Methods
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs
index 7a9ead0c06..e69de29bb2 100644
--- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Net;
-using System.Net.Sockets;
-using log4net;
-using Qpid.Buffer;
-using Qpid.Client.Protocol;
-
-namespace Qpid.Client.Transport.Socket.Blocking
-{
- class BlockingSocketProcessor : IConnectionCloser
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketProcessor));
-
- string _host;
- int _port;
- System.Net.Sockets.Socket _socket;
- private NetworkStream _networkStream;
- IByteChannel _byteChannel;
- IProtocolListener _protocolListener;
-
- public BlockingSocketProcessor(string host, int port, IProtocolListener protocolListener)
- {
- _host = host;
- _port = port;
- _protocolListener = protocolListener;
- _byteChannel = new ByteChannel(this);
- }
-
- /// <summary>
- /// Synchronous blocking connect.
- /// </summary>
- public void Connect()
- {
- _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
-
- IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
- IPAddress ipAddress = ipHostInfo.AddressList[0];
-
- IPEndPoint ipe = new IPEndPoint(ipAddress, _port);
-
- _socket.Connect(ipe);
- _networkStream = new NetworkStream(_socket, true);
- }
-
- public string getLocalEndPoint()
- {
- return _socket.LocalEndPoint.ToString();
- }
-
- public void Write(ByteBuffer byteBuffer)
- {
- try
- {
- _networkStream.Write(byteBuffer.Array, byteBuffer.Position, byteBuffer.Limit); // FIXME
- }
- catch (Exception e)
- {
- _log.Error("Write caused exception", e);
- _protocolListener.OnException(e);
- }
- }
-
- public ByteBuffer Read()
- {
- const int bufferSize = 4 * 1024; // TODO: Prevent constant allocation of buffers.
- byte[] bytes = new byte[bufferSize];
-
- int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
-
- ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
- byteBuffer.Limit = numOctets;
-
- byteBuffer.Flip();
-
- return byteBuffer;
- }
-
- public void Disconnect()
- {
- _networkStream.Flush();
- _networkStream.Close();
- _socket.Close();
- }
-
- public void Close()
- {
- Disconnect();
- }
-
- public IByteChannel ByteChannel
- {
- get { return _byteChannel; }
- }
- }
-}
-
-
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
index e18eefd493..607b7ca422 100644
--- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
@@ -20,101 +20,119 @@
*/
using System;
using System.Collections;
+using System.IO;
using System.Threading;
-using log4net;
+using Qpid.Client.Qms;
using Qpid.Client.Protocol;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
{
- public class BlockingSocketTransport : ITransport
- {
-// static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketTransport));
+ /// <summary>
+ /// TCP Socket transport supporting both
+ /// SSL and non-SSL connections.
+ /// </summary>
+ public class BlockingSocketTransport : ITransport
+ {
+ // Configuration variables.
+ IProtocolListener _protocolListener;
- // Configuration variables.
- string _host;
- int _port;
- IProtocolListener _protocolListener;
+ // Runtime variables.
+ private ISocketConnector _connector;
+ private IoHandler _ioHandler;
+ private AmqpChannel _amqpChannel;
+ private ManualResetEvent _stopEvent;
- // Runtime variables.
- private BlockingSocketProcessor _socketProcessor;
- private AmqpChannel _amqpChannel;
-
- private ReaderRunner _readerRunner;
- private Thread _readerThread;
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _amqpChannel; }
+ }
+ public string LocalEndpoint
+ {
+ get { return _connector.LocalEndpoint; }
+ }
- public BlockingSocketTransport(string host, int port, AMQConnection connection)
- {
- _host = host;
- _port = port;
- _protocolListener = connection.ProtocolListener;
- }
-
- public void Open()
- {
- _socketProcessor = new BlockingSocketProcessor(_host, _port, _protocolListener);
- _socketProcessor.Connect();
- _amqpChannel = new AmqpChannel(_socketProcessor.ByteChannel);
- _readerRunner = new ReaderRunner(this);
- _readerThread = new Thread(new ThreadStart(_readerRunner.Run));
- _readerThread.Start();
- }
+
+ /// <summary>
+ /// Connect to the specified broker
+ /// </summary>
+ /// <param name="broker">The broker to connect to</param>
+ /// <param name="connection">The AMQ connection</param>
+ public void Connect(IBrokerInfo broker, AMQConnection connection)
+ {
+ _stopEvent = new ManualResetEvent(false);
+ _protocolListener = connection.ProtocolListener;
- public string getLocalEndPoint()
- {
- return _socketProcessor.getLocalEndPoint();
- }
+ _ioHandler = MakeBrokerConnection(broker, connection);
+ // todo: get default read size from config!
- public void Close()
- {
- StopReaderThread();
- _socketProcessor.Disconnect();
- }
+ _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ // post an initial async read
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
+ }
- public IProtocolChannel ProtocolChannel { get { return _amqpChannel; } }
- public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } }
+ /// <summary>
+ /// Close the broker connection
+ /// </summary>
+ public void Close()
+ {
+ StopReading();
+ CloseBrokerConnection();
+ }
- public void StopReaderThread()
- {
- _readerRunner.Stop();
- }
+ private void StopReading()
+ {
+ _stopEvent.Set();
+ }
- class ReaderRunner
- {
- BlockingSocketTransport _transport;
- bool _running = true;
+ private void CloseBrokerConnection()
+ {
+ if ( _ioHandler != null )
+ {
+ _ioHandler.Dispose();
+ _ioHandler = null;
+ }
+ if ( _connector != null )
+ {
+ _connector.Dispose();
+ _connector = null;
+ }
+ }
- public ReaderRunner(BlockingSocketTransport transport)
- {
- _transport = transport;
- }
+ private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
+ {
+ if ( broker.UseSSL )
+ {
+ _connector = new SslSocketConnector();
+ } else
+ {
+ _connector = new SocketConnector();
+ }
- public void Run()
- {
- try
- {
- while (_running)
- {
- Queue frames = _transport.ProtocolChannel.Read();
+ Stream stream = _connector.Connect(broker);
+ return new IoHandler(stream, connection.ProtocolListener);
+ }
- foreach (IDataBlock dataBlock in frames)
- {
- _transport._protocolListener.OnMessage(dataBlock);
- }
- }
- }
- catch (Exception e)
- {
- _transport._protocolListener.OnException(e);
- }
- }
+ private void OnAsyncReadDone(IAsyncResult result)
+ {
+ try
+ {
+ Queue frames = _amqpChannel.EndRead(result);
+ // if we're not stopping, post a read again
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
- public void Stop()
+ // process results
+ foreach ( IDataBlock dataBlock in frames )
{
- // TODO: Check if this is thread safe. running is not volitile....
- _running = false;
+ _protocolListener.OnMessage(dataBlock);
}
- }
- }
+ } catch ( Exception e )
+ {
+ _protocolListener.OnException(e);
+ }
+ }
+ }
}
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
index a520815f84..ff2c301a91 100644
--- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
@@ -29,16 +29,16 @@ namespace Qpid.Client.Transport.Socket.Blocking
// Warning: don't use this log for regular logging.
private static readonly ILog _ioTraceLog = LogManager.GetLogger("Qpid.Client.ByteChannel.Tracing");
- BlockingSocketProcessor processor;
+ private IByteChannel _lowerChannel;
- public ByteChannel(BlockingSocketProcessor processor)
+ public ByteChannel(IByteChannel lowerChannel)
{
- this.processor = processor;
+ _lowerChannel = lowerChannel;
}
public ByteBuffer Read()
{
- ByteBuffer result = processor.Read();
+ ByteBuffer result = _lowerChannel.Read();
// TODO: Move into decorator.
if (_ioTraceLog.IsDebugEnabled)
@@ -49,6 +49,21 @@ namespace Qpid.Client.Transport.Socket.Blocking
return result;
}
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _lowerChannel.BeginRead(callback, state);
+ }
+
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _lowerChannel.EndRead(result);
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", buffer));
+ }
+ return buffer;
+ }
+
public void Write(ByteBuffer buffer)
{
// TODO: Move into decorator.
@@ -56,8 +71,22 @@ namespace Qpid.Client.Transport.Socket.Blocking
{
_ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
}
-
- processor.Write(buffer);
+
+ _lowerChannel.Write(buffer);
+ }
+
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ return _lowerChannel.BeginWrite(buffer, callback, state);
+ }
+
+ public void EndWrite(IAsyncResult result)
+ {
+ _lowerChannel.EndWrite(result);
}
- }
+ }
} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs
new file mode 100644
index 0000000000..ac0dc37a16
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ interface ISocketConnector : IDisposable
+ {
+ string LocalEndpoint { get; }
+ Stream Connect(IBrokerInfo broker);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs
new file mode 100644
index 0000000000..a651413581
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TCP connection over regular sockets.
+ /// </summary>
+ class SocketConnector : ISocketConnector
+ {
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ class MyTcpClient : TcpClient
+ {
+ public MyTcpClient(string host, int port)
+ : base(host, port)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+ }
+
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs
new file mode 100644
index 0000000000..24c3f5bcb8
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using log4net;
+using Qpid.Client.Qms;
+using Org.Mentalis.Security.Ssl;
+using MCertificate = Org.Mentalis.Security.Certificates.Certificate;
+using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TLS v1.0 connection using the Mentalis.org library
+ /// </summary>
+ /// <remarks>
+ /// It would've been easier to implement this at the StreamFilter
+ /// level, but unfortunately the Mentalis library doesn't support
+ /// a passthrough SSL stream class and is tied directly
+ /// to socket-like classes.
+ /// </remarks>
+ class SslSocketConnector : ISocketConnector
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector));
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ MCertificate cert = GetClientCert(broker);
+ SecurityOptions options = new SecurityOptions(
+ SecureProtocol.Tls1, cert, ConnectionEnd.Client
+ );
+ if ( broker.SslOptions != null
+ && broker.SslOptions.IgnoreValidationErrors )
+ {
+ _logger.Warn("Ignoring any certificate validation errors during SSL handshake...");
+ options.VerificationType = CredentialVerification.None;
+ }
+
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port, options);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ private static MCertificate GetClientCert(IBrokerInfo broker)
+ {
+ // if a client certificate is configured,
+ // use that to enable mutual authentication
+ MCertificate cert = null;
+ if ( broker.SslOptions != null
+ && broker.SslOptions.ClientCertificate != null )
+ {
+ cert = MCertificate.CreateFromX509Certificate(
+ broker.SslOptions.ClientCertificate
+ );
+ _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true));
+ }
+ return cert;
+ }
+
+ class MyTcpClient : SecureTcpClient
+ {
+ public MyTcpClient(string host, int port, SecurityOptions options)
+ : base(host, port, options)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+
+ }
+
+ }
+}