From 43f0dea566c81bb72aea75b825bbd97ec5d1d950 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 3 Dec 2009 23:55:48 +0000 Subject: Fix eol style property git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@886998 13f79535-47bb-0310-9956-ffa450edef68 --- .../Client/AMQAuthenticationException.cs | 78 +-- .../Qpid.Client/Client/AMQNoConsumersException.cs | 90 +-- dotnet/Qpid.Client/Client/AMQNoRouteException.cs | 92 +-- .../AuthenticationConfigurationSectionHandler.cs | 168 +++--- .../Client/Handler/QueueDeleteOkMethodHandler.cs | 88 +-- .../Client/Handler/QueuePurgeOkMethodHandler.cs | 88 +-- .../Qpid.Client/Client/Protocol/DefaultTimeouts.cs | 94 +-- .../Client/Security/CallbackHandlerRegistry.cs | 258 ++++----- .../Client/Security/IAMQCallbackHandler.cs | 70 +-- .../Security/UsernamePasswordCallbackHandler.cs | 112 ++-- dotnet/Qpid.Client/Client/SslOptions.cs | 162 +++--- .../Qpid.Client/Client/Transport/IStreamFilter.cs | 76 +-- dotnet/Qpid.Client/Client/Transport/IoHandler.cs | 644 ++++++++++----------- .../Client/Transport/ProtocolDecoderOutput.cs | 120 ++-- .../Socket/Blocking/BlockingSocketTransport.cs | 300 +++++----- .../Transport/Socket/Blocking/ByteChannel.cs | 184 +++--- .../Transport/Socket/Blocking/ISocketConnector.cs | 68 +-- .../Transport/Socket/Blocking/SocketConnector.cs | 142 ++--- .../Socket/Blocking/SslSocketConnector.cs | 214 +++---- dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs | 196 +++---- 20 files changed, 1622 insertions(+), 1622 deletions(-) (limited to 'dotnet/Qpid.Client/Client') diff --git a/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs b/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs index 6382eaaf39..7bb64e3fff 100644 --- a/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs +++ b/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs @@ -1,39 +1,39 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Runtime.Serialization; - -namespace Apache.Qpid.Client -{ - [Serializable] - public class AMQAuthenticationException : AMQException - { - public AMQAuthenticationException(int error, String message) - : base(error, message) - { - } - - protected AMQAuthenticationException(SerializationInfo info, StreamingContext ctxt) - : base(info, ctxt) - { - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQAuthenticationException : AMQException + { + public AMQAuthenticationException(int error, String message) + : base(error, message) + { + } + + protected AMQAuthenticationException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs index 0d93176734..5c9dd86c53 100644 --- a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs +++ b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs @@ -1,45 +1,45 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Runtime.Serialization; -using Apache.Qpid.Common; -using Apache.Qpid.Protocol; - -namespace Apache.Qpid.Client -{ - [Serializable] - public class AMQNoConsumersException : AMQUndeliveredException - { - public AMQNoConsumersException(string message) - : this(message, null) - { - } - - public AMQNoConsumersException(string message, object bounced) - : base(AMQConstant.NO_CONSUMERS.Code, message, bounced) - { - } - protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt) - : base(info, ctxt) - { - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using Apache.Qpid.Common; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQNoConsumersException : AMQUndeliveredException + { + public AMQNoConsumersException(string message) + : this(message, null) + { + } + + public AMQNoConsumersException(string message, object bounced) + : base(AMQConstant.NO_CONSUMERS.Code, message, bounced) + { + } + protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs index bde3cdd989..5868d78f32 100644 --- a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs +++ b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs @@ -1,46 +1,46 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Runtime.Serialization; -using Apache.Qpid.Common; -using Apache.Qpid.Protocol; - -namespace Apache.Qpid.Client -{ - [Serializable] - public class AMQNoRouteException : AMQUndeliveredException - { - public AMQNoRouteException(string message) - : this(message, null) - { - } - - public AMQNoRouteException(string message, object bounced) - : base(AMQConstant.NO_ROUTE.Code, message, bounced) - { - } - - protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt) - : base(info, ctxt) - { - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using Apache.Qpid.Common; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + [Serializable] + public class AMQNoRouteException : AMQUndeliveredException + { + public AMQNoRouteException(string message) + : this(message, null) + { + } + + public AMQNoRouteException(string message, object bounced) + : base(AMQConstant.NO_ROUTE.Code, message, bounced) + { + } + + protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs b/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs index ae9225a53a..8d289fa956 100644 --- a/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs +++ b/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs @@ -1,84 +1,84 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -using System; -using System.Collections; -using System.Collections.Specialized; -using System.Configuration; -using System.Text; - -using Apache.Qpid.Client.Security; -using Apache.Qpid.Sasl.Mechanisms; - -namespace Apache.Qpid.Client.Configuration -{ - public class AuthenticationConfigurationSectionHandler - : IConfigurationSectionHandler - { - - public object Create(object parent, object configContext, System.Xml.XmlNode section) - { - NameValueSectionHandler handler = new NameValueSectionHandler(); - OrderedHashTable schemes = new OrderedHashTable(); - - NameValueCollection options = (NameValueCollection) - handler.Create(parent, configContext, section); - - if ( options != null ) - { - foreach ( string key in options.Keys ) - { - Type type = Type.GetType(options[key]); - if ( type == null ) - throw new ConfigurationException(string.Format("Type '{0}' not found", key)); - if ( !typeof(IAMQCallbackHandler).IsAssignableFrom(type) ) - throw new ConfigurationException(string.Format("Type '{0}' does not implement IAMQCallbackHandler", key)); - - schemes.Add(key, type); - } - } - - return schemes; - } - - } // class AuthenticationConfigurationSectionHandler - - public class OrderedHashTable : Hashtable - { - private ArrayList _keys = new ArrayList(); - - public IList OrderedKeys - { - get { return _keys; } - } - - public override void Add(object key, object value) - { - base.Add(key, value); - _keys.Add(key); - } - public override void Remove(object key) - { - base.Remove(key); - _keys.Remove(key); - } - } -} // namespace Apache.Qpid.Client.Configuration +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Collections.Specialized; +using System.Configuration; +using System.Text; + +using Apache.Qpid.Client.Security; +using Apache.Qpid.Sasl.Mechanisms; + +namespace Apache.Qpid.Client.Configuration +{ + public class AuthenticationConfigurationSectionHandler + : IConfigurationSectionHandler + { + + public object Create(object parent, object configContext, System.Xml.XmlNode section) + { + NameValueSectionHandler handler = new NameValueSectionHandler(); + OrderedHashTable schemes = new OrderedHashTable(); + + NameValueCollection options = (NameValueCollection) + handler.Create(parent, configContext, section); + + if ( options != null ) + { + foreach ( string key in options.Keys ) + { + Type type = Type.GetType(options[key]); + if ( type == null ) + throw new ConfigurationException(string.Format("Type '{0}' not found", key)); + if ( !typeof(IAMQCallbackHandler).IsAssignableFrom(type) ) + throw new ConfigurationException(string.Format("Type '{0}' does not implement IAMQCallbackHandler", key)); + + schemes.Add(key, type); + } + } + + return schemes; + } + + } // class AuthenticationConfigurationSectionHandler + + public class OrderedHashTable : Hashtable + { + private ArrayList _keys = new ArrayList(); + + public IList OrderedKeys + { + get { return _keys; } + } + + public override void Add(object key, object value) + { + base.Add(key, value); + _keys.Add(key); + } + public override void Remove(object key) + { + base.Remove(key); + _keys.Remove(key); + } + } +} // namespace Apache.Qpid.Client.Configuration diff --git a/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs index 7290d758f8..70aa3e1078 100644 --- a/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs @@ -1,44 +1,44 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using log4net; -using Apache.Qpid.Client.Message; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Client.State; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Handler -{ - public class QueueDeleteOkMethodHandler : IStateAwareMethodListener - { - - private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler)); - - public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) - { - QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method; - if (body != null) - { - _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount); - } - } - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class QueueDeleteOkMethodHandler : IStateAwareMethodListener + { + + private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method; + if (body != null) + { + _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount); + } + } + + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs index 8bde707b00..22db70575d 100644 --- a/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs @@ -1,44 +1,44 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using log4net; -using Apache.Qpid.Client.Message; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Client.State; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Handler -{ - public class QueuePurgeOkMethodHandler : IStateAwareMethodListener - { - - private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler)); - - public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) - { - QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method; - if (body != null) - { - _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount); - } - } - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Handler +{ + public class QueuePurgeOkMethodHandler : IStateAwareMethodListener + { + + private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method; + if (body != null) + { + _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount); + } + } + + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs index 6841b46f54..2f23a1571d 100644 --- a/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs +++ b/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs @@ -1,47 +1,47 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -using System; -using System.Text; - -namespace Apache.Qpid.Client.Protocol -{ - /// - /// Default timeout values for the protocol - /// - sealed class DefaultTimeouts - { - /// - /// Maximum number of milliseconds to wait for a state change - /// in the protocol's state machine - /// - public const int MaxWaitForState = 30* 1000; - /// - /// Maximum number of milliseconds to wait for a reply - /// frame when doing synchronous writer to the broker - /// - public const int MaxWaitForSyncWriter = 30 * 1000; - - private DefaultTimeouts() - { - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Text; + +namespace Apache.Qpid.Client.Protocol +{ + /// + /// Default timeout values for the protocol + /// + sealed class DefaultTimeouts + { + /// + /// Maximum number of milliseconds to wait for a state change + /// in the protocol's state machine + /// + public const int MaxWaitForState = 30* 1000; + /// + /// Maximum number of milliseconds to wait for a reply + /// frame when doing synchronous writer to the broker + /// + public const int MaxWaitForSyncWriter = 30 * 1000; + + private DefaultTimeouts() + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs index 22f1c9d89b..9ac0381850 100644 --- a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs +++ b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs @@ -1,129 +1,129 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -using System; -using System.Collections; -using System.Configuration; -using System.Text; -using Apache.Qpid.Sasl; -using Apache.Qpid.Sasl.Mechanisms; - -using Apache.Qpid.Client.Configuration; - -namespace Apache.Qpid.Client.Security -{ - - /// - /// Helper class to map SASL mechanisms to our - /// internal ISaslCallbackHandler implementations. - /// - /// - /// The set of configured callback handlers and their order - /// controls the selection of the SASL mechanism used for authentication. - /// - /// You can either replace the default handler for CRAM-MD5 and PLAIN - /// authentication (the two default options) using the application - /// configuration file. Configuration is done by especifying the SASL - /// mechanism name (e.g PLAIN) and the type implementing the callback handler - /// used to provide any data required by the mechanism like username and password. - /// - /// - /// Callback handler types should implement the IAMQCallbackHandler interface. - /// - /// - /// New callbacks or authentication mechanisms can be configured like this: - /// - /// - /// - /// - ///
- /// - /// - /// - /// - /// - /// - /// - /// - /// ]]> - /// - public sealed class CallbackHandlerRegistry - { - private static CallbackHandlerRegistry _instance = - new CallbackHandlerRegistry(); - private OrderedHashTable _mechanism2HandlerMap; - private string[] _mechanisms; - - public static CallbackHandlerRegistry Instance - { - get { return _instance; } - } - - public string[] Mechanisms - { - get { return _mechanisms; } - } - - private CallbackHandlerRegistry() - { - _mechanism2HandlerMap = (OrderedHashTable) - ConfigurationSettings.GetConfig("qpid.client/authentication"); - - // configure default options if not available - if ( _mechanism2HandlerMap == null ) - _mechanism2HandlerMap = new OrderedHashTable(); - - if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) ) - _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); - if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) ) - _mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); - if ( !_mechanism2HandlerMap.Contains(CramMD5HexSaslClient.Mechanism) ) - _mechanism2HandlerMap.Add(CramMD5HexSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); - if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) ) - _mechanism2HandlerMap.Add(PlainSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); - - _mechanisms = new string[_mechanism2HandlerMap.Count]; - _mechanism2HandlerMap.OrderedKeys.CopyTo(_mechanisms, 0); - } - - public bool IsSupportedMechanism(string mechanism) - { - return _mechanism2HandlerMap.Contains(mechanism); - } - - public string ChooseMechanism(string mechanisms) - { - IList mechs = mechanisms.Split(' '); - foreach ( string supportedMech in _mechanisms ) - { - if ( mechs.Contains(supportedMech) ) - return supportedMech; - } - return null; - } - - public Type GetCallbackHandler(string mechanism) - { - return (Type)_mechanism2HandlerMap[mechanism]; - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Configuration; +using System.Text; +using Apache.Qpid.Sasl; +using Apache.Qpid.Sasl.Mechanisms; + +using Apache.Qpid.Client.Configuration; + +namespace Apache.Qpid.Client.Security +{ + + /// + /// Helper class to map SASL mechanisms to our + /// internal ISaslCallbackHandler implementations. + /// + /// + /// The set of configured callback handlers and their order + /// controls the selection of the SASL mechanism used for authentication. + /// + /// You can either replace the default handler for CRAM-MD5 and PLAIN + /// authentication (the two default options) using the application + /// configuration file. Configuration is done by especifying the SASL + /// mechanism name (e.g PLAIN) and the type implementing the callback handler + /// used to provide any data required by the mechanism like username and password. + /// + /// + /// Callback handler types should implement the IAMQCallbackHandler interface. + /// + /// + /// New callbacks or authentication mechanisms can be configured like this: + /// + /// + /// + /// + ///
+ /// + /// + /// + /// + /// + /// + /// + /// + /// ]]> + /// + public sealed class CallbackHandlerRegistry + { + private static CallbackHandlerRegistry _instance = + new CallbackHandlerRegistry(); + private OrderedHashTable _mechanism2HandlerMap; + private string[] _mechanisms; + + public static CallbackHandlerRegistry Instance + { + get { return _instance; } + } + + public string[] Mechanisms + { + get { return _mechanisms; } + } + + private CallbackHandlerRegistry() + { + _mechanism2HandlerMap = (OrderedHashTable) + ConfigurationSettings.GetConfig("qpid.client/authentication"); + + // configure default options if not available + if ( _mechanism2HandlerMap == null ) + _mechanism2HandlerMap = new OrderedHashTable(); + + if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + if ( !_mechanism2HandlerMap.Contains(CramMD5HexSaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(CramMD5HexSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) ) + _mechanism2HandlerMap.Add(PlainSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler)); + + _mechanisms = new string[_mechanism2HandlerMap.Count]; + _mechanism2HandlerMap.OrderedKeys.CopyTo(_mechanisms, 0); + } + + public bool IsSupportedMechanism(string mechanism) + { + return _mechanism2HandlerMap.Contains(mechanism); + } + + public string ChooseMechanism(string mechanisms) + { + IList mechs = mechanisms.Split(' '); + foreach ( string supportedMech in _mechanisms ) + { + if ( mechs.Contains(supportedMech) ) + return supportedMech; + } + return null; + } + + public Type GetCallbackHandler(string mechanism) + { + return (Type)_mechanism2HandlerMap[mechanism]; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs b/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs index 2560c1d96b..6ff45be04a 100644 --- a/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs +++ b/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs @@ -1,35 +1,35 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Text; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Sasl; - -namespace Apache.Qpid.Client.Security -{ - public interface IAMQCallbackHandler : ISaslCallbackHandler - { - void Initialize(AMQProtocolSession session); - } - -} - - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Security +{ + public interface IAMQCallbackHandler : ISaslCallbackHandler + { + void Initialize(AMQProtocolSession session); + } + +} + + diff --git a/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs b/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs index 489d4d1665..743ade77c9 100644 --- a/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs +++ b/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs @@ -1,56 +1,56 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -using System; -using System.Collections; -using System.Text; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Sasl; - -namespace Apache.Qpid.Client.Security -{ - internal class UsernamePasswordCallbackHandler : IAMQCallbackHandler - { - private AMQProtocolSession _session; - - public void Initialize(AMQProtocolSession session) - { - if ( session == null ) - throw new ArgumentNullException("session"); - - _session = session; - } - - public void Handle(ISaslCallback[] callbacks) - { - foreach ( ISaslCallback cb in callbacks ) - { - if ( cb is NameCallback ) - { - ((NameCallback)cb).Text = _session.Username; - } else if ( cb is PasswordCallback ) - { - ((PasswordCallback)cb).Text = _session.Password; - } - } - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Text; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Sasl; + +namespace Apache.Qpid.Client.Security +{ + internal class UsernamePasswordCallbackHandler : IAMQCallbackHandler + { + private AMQProtocolSession _session; + + public void Initialize(AMQProtocolSession session) + { + if ( session == null ) + throw new ArgumentNullException("session"); + + _session = session; + } + + public void Handle(ISaslCallback[] callbacks) + { + foreach ( ISaslCallback cb in callbacks ) + { + if ( cb is NameCallback ) + { + ((NameCallback)cb).Text = _session.Username; + } else if ( cb is PasswordCallback ) + { + ((PasswordCallback)cb).Text = _session.Password; + } + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/SslOptions.cs b/dotnet/Qpid.Client/Client/SslOptions.cs index d637101000..4630121828 100644 --- a/dotnet/Qpid.Client/Client/SslOptions.cs +++ b/dotnet/Qpid.Client/Client/SslOptions.cs @@ -1,81 +1,81 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Security.Cryptography.X509Certificates; - -namespace Apache.Qpid.Client -{ - /// - /// Configures SSL-related options to connect to an AMQP broker. - /// - /// - /// If the server certificate is not trusted by the client, - /// connection will fail. However, you can set the - /// property to true - /// to ignore any certificate verification errors for debugging purposes. - /// - public class SslOptions - { - private X509Certificate _clientCertificate; - private bool _ignoreValidationErrors; - - /// - /// Certificate to present to the broker to authenticate - /// this client connection - /// - public X509Certificate ClientCertificate - { - get { return _clientCertificate; } - } - - /// - /// If true, the validity of the broker certificate - /// will not be verified on connection - /// - public bool IgnoreValidationErrors - { - get { return _ignoreValidationErrors; } - } - - /// - /// Initialize a new instance with default values - /// (No client certificate, don't ignore validation errors) - /// - public SslOptions() - { - } - - /// - /// Initialize a new instance - /// - /// - /// Certificate to use to authenticate the client to the broker - /// - /// - /// If true, ignore any validation errors when validating the server certificate - /// - public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors) - { - _clientCertificate = clientCertificate; - _ignoreValidationErrors = ignoreValidationErrors; - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Security.Cryptography.X509Certificates; + +namespace Apache.Qpid.Client +{ + /// + /// Configures SSL-related options to connect to an AMQP broker. + /// + /// + /// If the server certificate is not trusted by the client, + /// connection will fail. However, you can set the + /// property to true + /// to ignore any certificate verification errors for debugging purposes. + /// + public class SslOptions + { + private X509Certificate _clientCertificate; + private bool _ignoreValidationErrors; + + /// + /// Certificate to present to the broker to authenticate + /// this client connection + /// + public X509Certificate ClientCertificate + { + get { return _clientCertificate; } + } + + /// + /// If true, the validity of the broker certificate + /// will not be verified on connection + /// + public bool IgnoreValidationErrors + { + get { return _ignoreValidationErrors; } + } + + /// + /// Initialize a new instance with default values + /// (No client certificate, don't ignore validation errors) + /// + public SslOptions() + { + } + + /// + /// Initialize a new instance + /// + /// + /// Certificate to use to authenticate the client to the broker + /// + /// + /// If true, ignore any validation errors when validating the server certificate + /// + public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors) + { + _clientCertificate = clientCertificate; + _ignoreValidationErrors = ignoreValidationErrors; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs index 7195b3ab04..e0e890fc5a 100644 --- a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs +++ b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs @@ -1,38 +1,38 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System.IO; - -namespace Apache.Qpid.Client.Transport -{ - /// - /// Defines a way to introduce an arbitrary filtering - /// stream into the stream chain managed by - /// - public interface IStreamFilter - { - /// - /// Creates a new filtering stream on top of another - /// - /// Next stream on the stack - /// A new filtering stream - Stream CreateFilterStream(Stream lowerStream); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.IO; + +namespace Apache.Qpid.Client.Transport +{ + /// + /// Defines a way to introduce an arbitrary filtering + /// stream into the stream chain managed by + /// + public interface IStreamFilter + { + /// + /// Creates a new filtering stream on top of another + /// + /// Next stream on the stack + /// A new filtering stream + Stream CreateFilterStream(Stream lowerStream); + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs index 9ac513069e..0475236d92 100644 --- a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs +++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs @@ -1,322 +1,322 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.IO; -using System.Threading; -using log4net; -using Apache.Qpid.Buffer; -using Apache.Qpid.Client.Protocol; - -namespace Apache.Qpid.Client.Transport -{ - /// - /// Responsible for reading and writing - /// ByteBuffers from/to network streams, and handling - /// the stream filters - /// - public class IoHandler : IByteChannel, IDisposable - { - private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler)); - private const int DEFAULT_BUFFER_SIZE = 32 * 1024; - - private Stream _topStream; - private IProtocolListener _protocolListener; - private int _readBufferSize; - - public int ReadBufferSize - { - get { return _readBufferSize; } - set { _readBufferSize = value; } - } - - /// - /// Initialize a new instance - /// - /// Underlying network stream - /// Protocol listener to report exceptions to - public IoHandler(Stream stream, IProtocolListener protocolListener) - { - if ( stream == null ) - throw new ArgumentNullException("stream"); - if ( protocolListener == null ) - throw new ArgumentNullException("protocolListener"); - - // initially, the stream at the top of the filter - // chain is the underlying network stream - _topStream = stream; - _protocolListener = protocolListener; - _readBufferSize = DEFAULT_BUFFER_SIZE; - } - - /// - /// Adds a new filter on the top of the chain - /// - /// Stream filter to put on top of the chain - /// - /// This should *only* be called during initialization. We don't - /// support changing the filter change after the first read/write - /// has been done and it's not thread-safe to boot! - /// - public void AddFilter(IStreamFilter filter) - { - _topStream = filter.CreateFilterStream(_topStream); - } - - #region IByteChannel Implementation - // - // IByteChannel Implementation - // - - /// - /// Read a from the underlying - /// network stream and any configured filters - /// - /// A ByteBuffer, if available - public ByteBuffer Read() - { - byte[] bytes = AllocateBuffer(); - - int numOctets = _topStream.Read(bytes, 0, bytes.Length); - - return WrapByteArray(bytes, numOctets); - } - - /// - /// Begin an asynchronous read operation - /// - /// Callback method to call when read operation completes - /// State object - /// An object - public IAsyncResult BeginRead(AsyncCallback callback, object state) - { - byte[] bytes = AllocateBuffer(); - ReadData rd = new ReadData(callback, state, bytes); - - // only put a callback if the caller wants one. - AsyncCallback myCallback = null; - if ( callback != null ) - myCallback = new AsyncCallback(OnAsyncReadDone); - - IAsyncResult result = _topStream.BeginRead( - bytes, 0, bytes.Length, myCallback,rd - ); - return new WrappedAsyncResult(result, bytes); - } - - /// - /// End an asynchronous read operation - /// - /// The object returned from - /// The read - public ByteBuffer EndRead(IAsyncResult result) - { - WrappedAsyncResult theResult = (WrappedAsyncResult)result; - int bytesRead = _topStream.EndRead(theResult.InnerResult); - return WrapByteArray(theResult.Buffer, bytesRead); - } - - /// - /// Write a to the underlying network - /// stream, going through any configured filters - /// - /// - public void Write(ByteBuffer buffer) - { - try - { - _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME - } - catch (Exception e) - { - _log.Warn("Write caused exception", e); - _protocolListener.OnException(e); - } - } - - /// - /// Begin an asynchronous write operation - /// - /// Buffer to write - /// A callback to call when the operation completes - /// State object - /// An object - public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) - { - try - { - return _topStream.BeginWrite( - buffer.Array, buffer.Position, buffer.Limit, - callback, state - ); - } catch ( Exception e ) - { - _log.Error("BeginWrite caused exception", e); - // not clear if an exception here should be propagated? we still - // need to propagate it upwards anyway! - _protocolListener.OnException(e); - throw; - } - } - - /// - /// End an asynchronous write operation - /// - /// The object returned by - public void EndWrite(IAsyncResult result) - { - try - { - _topStream.EndWrite(result); - } catch ( Exception e ) - { - _log.Error("EndWrite caused exception", e); - // not clear if an exception here should be propagated? - _protocolListener.OnException(e); - //throw; - } - } - #endregion // IByteChannel Implementation - - #region IDisposable Implementation - // - // IDisposable Implementation - // - - public void Dispose() - { - if ( _topStream != null ) - { - _topStream.Close(); - } - } - - #endregion // IDisposable Implementation - - #region Private and Helper Classes/Methods - // - // Private and Helper Classes/Methods - // - - private byte[] AllocateBuffer() - { - return new byte[ReadBufferSize]; - } - - private static ByteBuffer WrapByteArray(byte[] bytes, int size) - { - ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes); - byteBuffer.Limit = size; - byteBuffer.Flip(); - - return byteBuffer; - } - - - private static void OnAsyncReadDone(IAsyncResult result) - { - ReadData rd = (ReadData) result.AsyncState; - IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer); - rd.Callback(wrapped); - } - - class ReadData - { - private object _state; - private AsyncCallback _callback; - private byte[] _buffer; - - public object State - { - get { return _state; } - } - - public AsyncCallback Callback - { - get { return _callback; } - } - - public byte[] Buffer - { - get { return _buffer; } - } - - public ReadData(AsyncCallback callback, object state, byte[] buffer) - { - _callback = callback; - _state = state; - _buffer = buffer; - } - } - - class WrappedAsyncResult : IAsyncResult - { - private IAsyncResult _innerResult; - private byte[] _buffer; - - #region IAsyncResult Properties - // - // IAsyncResult Properties - // - public bool IsCompleted - { - get { return _innerResult.IsCompleted; } - } - - public WaitHandle AsyncWaitHandle - { - get { return _innerResult.AsyncWaitHandle; } - } - - public object AsyncState - { - get { return _innerResult.AsyncState; } - } - - public bool CompletedSynchronously - { - get { return _innerResult.CompletedSynchronously; } - } - #endregion // IAsyncResult Properties - - public IAsyncResult InnerResult - { - get { return _innerResult; } - } - public byte[] Buffer - { - get { return _buffer; } - } - - public WrappedAsyncResult(IAsyncResult result, byte[] buffer) - { - if ( result == null ) - throw new ArgumentNullException("result"); - if ( buffer == null ) - throw new ArgumentNullException("buffer"); - - _innerResult = result; - _buffer = buffer; - } - } - - #endregion // Private and Helper Classes/Methods - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Protocol; + +namespace Apache.Qpid.Client.Transport +{ + /// + /// Responsible for reading and writing + /// ByteBuffers from/to network streams, and handling + /// the stream filters + /// + public class IoHandler : IByteChannel, IDisposable + { + private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler)); + private const int DEFAULT_BUFFER_SIZE = 32 * 1024; + + private Stream _topStream; + private IProtocolListener _protocolListener; + private int _readBufferSize; + + public int ReadBufferSize + { + get { return _readBufferSize; } + set { _readBufferSize = value; } + } + + /// + /// Initialize a new instance + /// + /// Underlying network stream + /// Protocol listener to report exceptions to + public IoHandler(Stream stream, IProtocolListener protocolListener) + { + if ( stream == null ) + throw new ArgumentNullException("stream"); + if ( protocolListener == null ) + throw new ArgumentNullException("protocolListener"); + + // initially, the stream at the top of the filter + // chain is the underlying network stream + _topStream = stream; + _protocolListener = protocolListener; + _readBufferSize = DEFAULT_BUFFER_SIZE; + } + + /// + /// Adds a new filter on the top of the chain + /// + /// Stream filter to put on top of the chain + /// + /// This should *only* be called during initialization. We don't + /// support changing the filter change after the first read/write + /// has been done and it's not thread-safe to boot! + /// + public void AddFilter(IStreamFilter filter) + { + _topStream = filter.CreateFilterStream(_topStream); + } + + #region IByteChannel Implementation + // + // IByteChannel Implementation + // + + /// + /// Read a from the underlying + /// network stream and any configured filters + /// + /// A ByteBuffer, if available + public ByteBuffer Read() + { + byte[] bytes = AllocateBuffer(); + + int numOctets = _topStream.Read(bytes, 0, bytes.Length); + + return WrapByteArray(bytes, numOctets); + } + + /// + /// Begin an asynchronous read operation + /// + /// Callback method to call when read operation completes + /// State object + /// An object + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + byte[] bytes = AllocateBuffer(); + ReadData rd = new ReadData(callback, state, bytes); + + // only put a callback if the caller wants one. + AsyncCallback myCallback = null; + if ( callback != null ) + myCallback = new AsyncCallback(OnAsyncReadDone); + + IAsyncResult result = _topStream.BeginRead( + bytes, 0, bytes.Length, myCallback,rd + ); + return new WrappedAsyncResult(result, bytes); + } + + /// + /// End an asynchronous read operation + /// + /// The object returned from + /// The read + public ByteBuffer EndRead(IAsyncResult result) + { + WrappedAsyncResult theResult = (WrappedAsyncResult)result; + int bytesRead = _topStream.EndRead(theResult.InnerResult); + return WrapByteArray(theResult.Buffer, bytesRead); + } + + /// + /// Write a to the underlying network + /// stream, going through any configured filters + /// + /// + public void Write(ByteBuffer buffer) + { + try + { + _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME + } + catch (Exception e) + { + _log.Warn("Write caused exception", e); + _protocolListener.OnException(e); + } + } + + /// + /// Begin an asynchronous write operation + /// + /// Buffer to write + /// A callback to call when the operation completes + /// State object + /// An object + public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) + { + try + { + return _topStream.BeginWrite( + buffer.Array, buffer.Position, buffer.Limit, + callback, state + ); + } catch ( Exception e ) + { + _log.Error("BeginWrite caused exception", e); + // not clear if an exception here should be propagated? we still + // need to propagate it upwards anyway! + _protocolListener.OnException(e); + throw; + } + } + + /// + /// End an asynchronous write operation + /// + /// The object returned by + public void EndWrite(IAsyncResult result) + { + try + { + _topStream.EndWrite(result); + } catch ( Exception e ) + { + _log.Error("EndWrite caused exception", e); + // not clear if an exception here should be propagated? + _protocolListener.OnException(e); + //throw; + } + } + #endregion // IByteChannel Implementation + + #region IDisposable Implementation + // + // IDisposable Implementation + // + + public void Dispose() + { + if ( _topStream != null ) + { + _topStream.Close(); + } + } + + #endregion // IDisposable Implementation + + #region Private and Helper Classes/Methods + // + // Private and Helper Classes/Methods + // + + private byte[] AllocateBuffer() + { + return new byte[ReadBufferSize]; + } + + private static ByteBuffer WrapByteArray(byte[] bytes, int size) + { + ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes); + byteBuffer.Limit = size; + byteBuffer.Flip(); + + return byteBuffer; + } + + + private static void OnAsyncReadDone(IAsyncResult result) + { + ReadData rd = (ReadData) result.AsyncState; + IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer); + rd.Callback(wrapped); + } + + class ReadData + { + private object _state; + private AsyncCallback _callback; + private byte[] _buffer; + + public object State + { + get { return _state; } + } + + public AsyncCallback Callback + { + get { return _callback; } + } + + public byte[] Buffer + { + get { return _buffer; } + } + + public ReadData(AsyncCallback callback, object state, byte[] buffer) + { + _callback = callback; + _state = state; + _buffer = buffer; + } + } + + class WrappedAsyncResult : IAsyncResult + { + private IAsyncResult _innerResult; + private byte[] _buffer; + + #region IAsyncResult Properties + // + // IAsyncResult Properties + // + public bool IsCompleted + { + get { return _innerResult.IsCompleted; } + } + + public WaitHandle AsyncWaitHandle + { + get { return _innerResult.AsyncWaitHandle; } + } + + public object AsyncState + { + get { return _innerResult.AsyncState; } + } + + public bool CompletedSynchronously + { + get { return _innerResult.CompletedSynchronously; } + } + #endregion // IAsyncResult Properties + + public IAsyncResult InnerResult + { + get { return _innerResult; } + } + public byte[] Buffer + { + get { return _buffer; } + } + + public WrappedAsyncResult(IAsyncResult result, byte[] buffer) + { + if ( result == null ) + throw new ArgumentNullException("result"); + if ( buffer == null ) + throw new ArgumentNullException("buffer"); + + _innerResult = result; + _buffer = buffer; + } + } + + #endregion // Private and Helper Classes/Methods + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs index 3841c158e4..9fa313152f 100644 --- a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs +++ b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs @@ -1,60 +1,60 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Codec; -using Apache.Qpid.Framing; -using log4net; - -namespace Apache.Qpid.Client.Transport -{ - /// - /// implementation that forwards - /// each as it is decoded to the - /// protocol listener - /// - internal class ProtocolDecoderOutput : IProtocolDecoderOutput - { - private IProtocolListener _protocolListener; - static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel"); - - public ProtocolDecoderOutput(IProtocolListener protocolListener) - { - if ( protocolListener == null ) - throw new ArgumentNullException("protocolListener"); - - _protocolListener = protocolListener; - } - - public void Write(object message) - { - IDataBlock block = message as IDataBlock; - if ( block != null ) - { - _protocolTraceLog.Debug(String.Format("READ {0}", block)); - _protocolListener.OnMessage(block); - } - } - } -} // namespace Apache.Qpid.Client.Transport - - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Codec; +using Apache.Qpid.Framing; +using log4net; + +namespace Apache.Qpid.Client.Transport +{ + /// + /// implementation that forwards + /// each as it is decoded to the + /// protocol listener + /// + internal class ProtocolDecoderOutput : IProtocolDecoderOutput + { + private IProtocolListener _protocolListener; + static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel"); + + public ProtocolDecoderOutput(IProtocolListener protocolListener) + { + if ( protocolListener == null ) + throw new ArgumentNullException("protocolListener"); + + _protocolListener = protocolListener; + } + + public void Write(object message) + { + IDataBlock block = message as IDataBlock; + if ( block != null ) + { + _protocolTraceLog.Debug(String.Format("READ {0}", block)); + _protocolListener.OnMessage(block); + } + } + } +} // namespace Apache.Qpid.Client.Transport + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs index 8a16f9a675..f336d8a80a 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -1,150 +1,150 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using System.IO; -using System.Threading; -using Apache.Qpid.Client.Qms; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Codec; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - /// - /// TCP Socket transport supporting both - /// SSL and non-SSL connections. - /// - public class BlockingSocketTransport : ITransport - { - // Configuration variables. - IProtocolListener _protocolListener; - - // Runtime variables. - private ISocketConnector _connector; - private IoHandler _ioHandler; - private AmqpChannel _amqpChannel; - private ManualResetEvent _stopEvent; - - public IProtocolWriter ProtocolWriter - { - get { return _amqpChannel; } - } - public string LocalEndpoint - { - get { return _connector.LocalEndpoint; } - } - - - /// - /// Connect to the specified broker - /// - /// The broker to connect to - /// The AMQ connection - public void Connect(IBrokerInfo broker, AMQConnection connection) - { - _stopEvent = new ManualResetEvent(false); - _protocolListener = connection.ProtocolListener; - - _ioHandler = MakeBrokerConnection(broker, connection); - // todo: get default read size from config! - - IProtocolDecoderOutput decoderOutput = - new ProtocolDecoderOutput(_protocolListener); - _amqpChannel = - new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput); - - // post an initial async read - _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this); - } - - /// - /// Close the broker connection - /// - public void Close() - { - StopReading(); - CloseBrokerConnection(); - } - - private void StopReading() - { - _stopEvent.Set(); - } - - private void CloseBrokerConnection() - { - if ( _ioHandler != null ) - { - _ioHandler.Dispose(); - _ioHandler = null; - } - if ( _connector != null ) - { - _connector.Dispose(); - _connector = null; - } - } - - private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection) - { - if ( broker.UseSSL ) - { - _connector = new SslSocketConnector(); - } else - { - _connector = new SocketConnector(); - } - - Stream stream = _connector.Connect(broker); - return new IoHandler(stream, connection.ProtocolListener); - } - - private void OnAsyncReadDone(IAsyncResult result) - { - try - { - _amqpChannel.EndRead(result); - - bool stopping = _stopEvent.WaitOne(0, false); - if ( !stopping ) - _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null); - } catch ( Exception e ) - { - // ignore any errors during closing - bool stopping = _stopEvent.WaitOne(0, false); - if ( !stopping ) - _protocolListener.OnException(e); - } - } - - #region IProtocolDecoderOutput Members - - public void Write(object message) - { - _protocolListener.OnMessage((IDataBlock)message); - } - - #endregion - } -} - - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.IO; +using System.Threading; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Codec; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + /// + /// TCP Socket transport supporting both + /// SSL and non-SSL connections. + /// + public class BlockingSocketTransport : ITransport + { + // Configuration variables. + IProtocolListener _protocolListener; + + // Runtime variables. + private ISocketConnector _connector; + private IoHandler _ioHandler; + private AmqpChannel _amqpChannel; + private ManualResetEvent _stopEvent; + + public IProtocolWriter ProtocolWriter + { + get { return _amqpChannel; } + } + public string LocalEndpoint + { + get { return _connector.LocalEndpoint; } + } + + + /// + /// Connect to the specified broker + /// + /// The broker to connect to + /// The AMQ connection + public void Connect(IBrokerInfo broker, AMQConnection connection) + { + _stopEvent = new ManualResetEvent(false); + _protocolListener = connection.ProtocolListener; + + _ioHandler = MakeBrokerConnection(broker, connection); + // todo: get default read size from config! + + IProtocolDecoderOutput decoderOutput = + new ProtocolDecoderOutput(_protocolListener); + _amqpChannel = + new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput); + + // post an initial async read + _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this); + } + + /// + /// Close the broker connection + /// + public void Close() + { + StopReading(); + CloseBrokerConnection(); + } + + private void StopReading() + { + _stopEvent.Set(); + } + + private void CloseBrokerConnection() + { + if ( _ioHandler != null ) + { + _ioHandler.Dispose(); + _ioHandler = null; + } + if ( _connector != null ) + { + _connector.Dispose(); + _connector = null; + } + } + + private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection) + { + if ( broker.UseSSL ) + { + _connector = new SslSocketConnector(); + } else + { + _connector = new SocketConnector(); + } + + Stream stream = _connector.Connect(broker); + return new IoHandler(stream, connection.ProtocolListener); + } + + private void OnAsyncReadDone(IAsyncResult result) + { + try + { + _amqpChannel.EndRead(result); + + bool stopping = _stopEvent.WaitOne(0, false); + if ( !stopping ) + _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null); + } catch ( Exception e ) + { + // ignore any errors during closing + bool stopping = _stopEvent.WaitOne(0, false); + if ( !stopping ) + _protocolListener.OnException(e); + } + } + + #region IProtocolDecoderOutput Members + + public void Write(object message) + { + _protocolListener.OnMessage((IDataBlock)message); + } + + #endregion + } +} + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs index 73575c7086..4540f01f4e 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs @@ -1,92 +1,92 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using log4net; -using Apache.Qpid.Buffer; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - class ByteChannel : IByteChannel - { - // Warning: don't use this log for regular logging. - private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel"); - - private IByteChannel _lowerChannel; - - public ByteChannel(IByteChannel lowerChannel) - { - _lowerChannel = lowerChannel; - } - - public ByteBuffer Read() - { - ByteBuffer result = _lowerChannel.Read(); - - // TODO: Move into decorator. - if (_ioTraceLog.IsDebugEnabled) - { - _ioTraceLog.Debug(String.Format("READ {0}", result)); - } - - return result; - } - - public IAsyncResult BeginRead(AsyncCallback callback, object state) - { - return _lowerChannel.BeginRead(callback, state); - } - - public ByteBuffer EndRead(IAsyncResult result) - { - ByteBuffer buffer = _lowerChannel.EndRead(result); - if ( _ioTraceLog.IsDebugEnabled ) - { - _ioTraceLog.Debug(String.Format("READ {0}", buffer)); - } - return buffer; - } - - public void Write(ByteBuffer buffer) - { - // TODO: Move into decorator. - if (_ioTraceLog.IsDebugEnabled) - { - _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); - } - - _lowerChannel.Write(buffer); - } - - public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) - { - if ( _ioTraceLog.IsDebugEnabled ) - { - _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); - } - return _lowerChannel.BeginWrite(buffer, callback, state); - } - - public void EndWrite(IAsyncResult result) - { - _lowerChannel.EndWrite(result); - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + class ByteChannel : IByteChannel + { + // Warning: don't use this log for regular logging. + private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel"); + + private IByteChannel _lowerChannel; + + public ByteChannel(IByteChannel lowerChannel) + { + _lowerChannel = lowerChannel; + } + + public ByteBuffer Read() + { + ByteBuffer result = _lowerChannel.Read(); + + // TODO: Move into decorator. + if (_ioTraceLog.IsDebugEnabled) + { + _ioTraceLog.Debug(String.Format("READ {0}", result)); + } + + return result; + } + + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + return _lowerChannel.BeginRead(callback, state); + } + + public ByteBuffer EndRead(IAsyncResult result) + { + ByteBuffer buffer = _lowerChannel.EndRead(result); + if ( _ioTraceLog.IsDebugEnabled ) + { + _ioTraceLog.Debug(String.Format("READ {0}", buffer)); + } + return buffer; + } + + public void Write(ByteBuffer buffer) + { + // TODO: Move into decorator. + if (_ioTraceLog.IsDebugEnabled) + { + _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); + } + + _lowerChannel.Write(buffer); + } + + public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) + { + if ( _ioTraceLog.IsDebugEnabled ) + { + _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); + } + return _lowerChannel.BeginWrite(buffer, callback, state); + } + + public void EndWrite(IAsyncResult result) + { + _lowerChannel.EndWrite(result); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs index 3d5d2898cf..137fa19c0d 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs @@ -1,34 +1,34 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.IO; -using Apache.Qpid.Client.Qms; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - interface ISocketConnector : IDisposable - { - string LocalEndpoint { get; } - Stream Connect(IBrokerInfo broker); - } -} - - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + interface ISocketConnector : IDisposable + { + string LocalEndpoint { get; } + Stream Connect(IBrokerInfo broker); + } +} + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs index 83f7287e9b..b6dd8c3be1 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs @@ -1,71 +1,71 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System.IO; -using System.Net; -using System.Net.Sockets; -using Apache.Qpid.Client.Qms; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - /// - /// Implements a TCP connection over regular sockets. - /// - class SocketConnector : ISocketConnector - { - private MyTcpClient _tcpClient; - - public string LocalEndpoint - { - get { return _tcpClient.LocalEndpoint.ToString(); } - } - - public Stream Connect(IBrokerInfo broker) - { - _tcpClient = new MyTcpClient(broker.Host, broker.Port); - return _tcpClient.GetStream(); - } - - public void Dispose() - { - if ( _tcpClient != null ) - { - _tcpClient.Close(); - _tcpClient = null; - } - } - - class MyTcpClient : TcpClient - { - public MyTcpClient(string host, int port) - : base(host, port) - { - } - - public EndPoint LocalEndpoint - { - get { return Client.LocalEndPoint; } - } - } - - } -} - - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.IO; +using System.Net; +using System.Net.Sockets; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + /// + /// Implements a TCP connection over regular sockets. + /// + class SocketConnector : ISocketConnector + { + private MyTcpClient _tcpClient; + + public string LocalEndpoint + { + get { return _tcpClient.LocalEndpoint.ToString(); } + } + + public Stream Connect(IBrokerInfo broker) + { + _tcpClient = new MyTcpClient(broker.Host, broker.Port); + return _tcpClient.GetStream(); + } + + public void Dispose() + { + if ( _tcpClient != null ) + { + _tcpClient.Close(); + _tcpClient = null; + } + } + + class MyTcpClient : TcpClient + { + public MyTcpClient(string host, int port) + : base(host, port) + { + } + + public EndPoint LocalEndpoint + { + get { return Client.LocalEndPoint; } + } + } + + } +} + + diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs index 708edde48c..8436e6fc4f 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs @@ -1,107 +1,107 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System.IO; -using System.Net; -using log4net; -using Apache.Qpid.Client.Qms; -using Org.Mentalis.Security.Ssl; -using MCertificate = Org.Mentalis.Security.Certificates.Certificate; -using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - /// - /// Implements a TLS v1.0 connection using the Mentalis.org library - /// - /// - /// It would've been easier to implement this at the StreamFilter - /// level, but unfortunately the Mentalis library doesn't support - /// a passthrough SSL stream class and is tied directly - /// to socket-like classes. - /// - class SslSocketConnector : ISocketConnector - { - private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector)); - private MyTcpClient _tcpClient; - - public string LocalEndpoint - { - get { return _tcpClient.LocalEndpoint.ToString(); } - } - - public Stream Connect(IBrokerInfo broker) - { - MCertificate cert = GetClientCert(broker); - SecurityOptions options = new SecurityOptions( - SecureProtocol.Tls1, cert, ConnectionEnd.Client - ); - if ( broker.SslOptions != null - && broker.SslOptions.IgnoreValidationErrors ) - { - _logger.Warn("Ignoring any certificate validation errors during SSL handshake..."); - options.VerificationType = CredentialVerification.None; - } - - _tcpClient = new MyTcpClient(broker.Host, broker.Port, options); - return _tcpClient.GetStream(); - } - - public void Dispose() - { - if ( _tcpClient != null ) - { - _tcpClient.Close(); - _tcpClient = null; - } - } - - private static MCertificate GetClientCert(IBrokerInfo broker) - { - // if a client certificate is configured, - // use that to enable mutual authentication - MCertificate cert = null; - if ( broker.SslOptions != null - && broker.SslOptions.ClientCertificate != null ) - { - cert = MCertificate.CreateFromX509Certificate( - broker.SslOptions.ClientCertificate - ); - _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true)); - } - return cert; - } - - class MyTcpClient : SecureTcpClient - { - public MyTcpClient(string host, int port, SecurityOptions options) - : base(host, port, options) - { - } - - public EndPoint LocalEndpoint - { - get { return Client.LocalEndPoint; } - } - - } - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.IO; +using System.Net; +using log4net; +using Apache.Qpid.Client.Qms; +using Org.Mentalis.Security.Ssl; +using MCertificate = Org.Mentalis.Security.Certificates.Certificate; +using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + /// + /// Implements a TLS v1.0 connection using the Mentalis.org library + /// + /// + /// It would've been easier to implement this at the StreamFilter + /// level, but unfortunately the Mentalis library doesn't support + /// a passthrough SSL stream class and is tied directly + /// to socket-like classes. + /// + class SslSocketConnector : ISocketConnector + { + private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector)); + private MyTcpClient _tcpClient; + + public string LocalEndpoint + { + get { return _tcpClient.LocalEndpoint.ToString(); } + } + + public Stream Connect(IBrokerInfo broker) + { + MCertificate cert = GetClientCert(broker); + SecurityOptions options = new SecurityOptions( + SecureProtocol.Tls1, cert, ConnectionEnd.Client + ); + if ( broker.SslOptions != null + && broker.SslOptions.IgnoreValidationErrors ) + { + _logger.Warn("Ignoring any certificate validation errors during SSL handshake..."); + options.VerificationType = CredentialVerification.None; + } + + _tcpClient = new MyTcpClient(broker.Host, broker.Port, options); + return _tcpClient.GetStream(); + } + + public void Dispose() + { + if ( _tcpClient != null ) + { + _tcpClient.Close(); + _tcpClient = null; + } + } + + private static MCertificate GetClientCert(IBrokerInfo broker) + { + // if a client certificate is configured, + // use that to enable mutual authentication + MCertificate cert = null; + if ( broker.SslOptions != null + && broker.SslOptions.ClientCertificate != null ) + { + cert = MCertificate.CreateFromX509Certificate( + broker.SslOptions.ClientCertificate + ); + _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true)); + } + return cert; + } + + class MyTcpClient : SecureTcpClient + { + public MyTcpClient(string host, int port, SecurityOptions options) + : base(host, port, options) + { + } + + public EndPoint LocalEndpoint + { + get { return Client.LocalEndPoint; } + } + + } + + } +} diff --git a/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs b/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs index 87bb2a2859..a06de9eac8 100644 --- a/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs +++ b/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs @@ -1,98 +1,98 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using System.Text; -using System.Threading; -using Apache.Qpid.Collections; -using Apache.Qpid.Common; - -namespace Apache.Qpid.Client.Util -{ - internal delegate void ThresholdMethod(int currentCount); - - /// - /// Basic bounded queue used to implement prefetching. - /// Notice we do the callbacks here asynchronously to - /// avoid adding more complexity to the channel impl. - /// - internal class FlowControlQueue - { - private BlockingQueue _queue = new LinkedBlockingQueue(); - private int _itemCount; - private int _lowerBound; - private int _upperBound; - private ThresholdMethod _underThreshold; - private ThresholdMethod _overThreshold; - - public FlowControlQueue( - int lowerBound, - int upperBound, - ThresholdMethod underThreshold, - ThresholdMethod overThreshold - ) - { - _lowerBound = lowerBound; - _upperBound = upperBound; - _underThreshold = underThreshold; - _overThreshold = overThreshold; - } - - public void Enqueue(object item) - { - _queue.EnqueueBlocking(item); - int count = Interlocked.Increment(ref _itemCount); - if ( _overThreshold != null ) - { - if ( count == _upperBound ) - { - _overThreshold.BeginInvoke( - count, new AsyncCallback(OnAsyncCallEnd), - _overThreshold - ); - } - } - } - - public object Dequeue() - { - object item = _queue.DequeueBlocking(); - int count = Interlocked.Decrement(ref _itemCount); - if ( _underThreshold != null ) - { - if ( count == _lowerBound ) - { - _underThreshold.BeginInvoke( - count, new AsyncCallback(OnAsyncCallEnd), - _underThreshold - ); - } - } - return item; - } - - private void OnAsyncCallEnd(IAsyncResult res) - { - ThresholdMethod method = (ThresholdMethod)res.AsyncState; - method.EndInvoke(res); - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using System.Threading; +using Apache.Qpid.Collections; +using Apache.Qpid.Common; + +namespace Apache.Qpid.Client.Util +{ + internal delegate void ThresholdMethod(int currentCount); + + /// + /// Basic bounded queue used to implement prefetching. + /// Notice we do the callbacks here asynchronously to + /// avoid adding more complexity to the channel impl. + /// + internal class FlowControlQueue + { + private BlockingQueue _queue = new LinkedBlockingQueue(); + private int _itemCount; + private int _lowerBound; + private int _upperBound; + private ThresholdMethod _underThreshold; + private ThresholdMethod _overThreshold; + + public FlowControlQueue( + int lowerBound, + int upperBound, + ThresholdMethod underThreshold, + ThresholdMethod overThreshold + ) + { + _lowerBound = lowerBound; + _upperBound = upperBound; + _underThreshold = underThreshold; + _overThreshold = overThreshold; + } + + public void Enqueue(object item) + { + _queue.EnqueueBlocking(item); + int count = Interlocked.Increment(ref _itemCount); + if ( _overThreshold != null ) + { + if ( count == _upperBound ) + { + _overThreshold.BeginInvoke( + count, new AsyncCallback(OnAsyncCallEnd), + _overThreshold + ); + } + } + } + + public object Dequeue() + { + object item = _queue.DequeueBlocking(); + int count = Interlocked.Decrement(ref _itemCount); + if ( _underThreshold != null ) + { + if ( count == _lowerBound ) + { + _underThreshold.BeginInvoke( + count, new AsyncCallback(OnAsyncCallEnd), + _underThreshold + ); + } + } + return item; + } + + private void OnAsyncCallEnd(IAsyncResult res) + { + ThresholdMethod method = (ThresholdMethod)res.AsyncState; + method.EndInvoke(res); + } + } +} -- cgit v1.2.1