From da7718ef463775acc7d6fbecf2d64c1bbfc39fd8 Mon Sep 17 00:00:00 2001 From: Justin Ross Date: Tue, 19 Apr 2016 23:11:13 +0000 Subject: QPID-7207: Remove files and components that are obsolete or no longer in use; move doc and packaging pieces to the cpp subtree git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1740032 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs | 57 -- qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs | 57 -- .../src/Apache/Qpid/AmqpTypes/AmqpProperties.cs | 301 ------- qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs | 91 --- qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs | 33 - .../wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj | 158 ---- qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs | 57 -- .../src/Apache/Qpid/AmqpTypes/CreateNetModule.bat | 33 - .../Qpid/AmqpTypes/Properties/AssemblyInfo.cs | 55 -- qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs | 35 - .../src/Apache/Qpid/Channel/AmqpBinaryBinding.cs | 68 -- .../Channel/AmqpBinaryBindingCollectionElement.cs | 29 - .../AmqpBinaryBindingConfigurationElement.cs | 79 -- qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs | 153 ---- .../Qpid/Channel/AmqpBindingCollectionElement.cs | 29 - .../Channel/AmqpBindingConfigurationElement.cs | 344 -------- .../src/Apache/Qpid/Channel/AmqpChannelFactory.cs | 154 ---- .../src/Apache/Qpid/Channel/AmqpChannelHelpers.cs | 234 ------ .../src/Apache/Qpid/Channel/AmqpChannelListener.cs | 204 ----- qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs | 113 --- .../src/Apache/Qpid/Channel/AmqpCredentialType.cs | 37 - qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs | 75 -- .../src/Apache/Qpid/Channel/AmqpSecurityElement.cs | 126 --- .../src/Apache/Qpid/Channel/AmqpSecurityMode.cs | 37 - .../Qpid/Channel/AmqpTransportBindingElement.cs | 186 ----- .../Apache/Qpid/Channel/AmqpTransportChannel.cs | 642 --------------- .../Apache/Qpid/Channel/AmqpTransportSecurity.cs | 101 --- qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj | 112 --- .../src/Apache/Qpid/Channel/ConnectionManager.cs | 329 -------- .../Apache/Qpid/Channel/Properties/AssemblyInfo.cs | 52 -- qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs | 374 --------- .../src/Apache/Qpid/Channel/RawMessageEncoder.cs | 113 --- .../Qpid/Channel/RawMessageEncoderFactory.cs | 45 -- .../Channel/RawMessageEncodingBindingElement.cs | 102 --- qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs | 353 --------- qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs | 221 ------ qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp | 797 ------------------- .../wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp | 276 ------- qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h | 97 --- qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp | 76 -- qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h | 61 -- qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp | 633 --------------- qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h | 109 --- qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp | 57 -- .../src/Apache/Qpid/Interop/CompletionWaiter.cpp | 145 ---- .../wcf/src/Apache/Qpid/Interop/CompletionWaiter.h | 98 --- .../src/Apache/Qpid/Interop/DtxResourceManager.cpp | 285 ------- .../src/Apache/Qpid/Interop/DtxResourceManager.h | 76 -- qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp | 868 --------------------- qpid/wcf/src/Apache/Qpid/Interop/InputLink.h | 110 --- qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj | 501 ------------ .../src/Apache/Qpid/Interop/MessageBodyStream.cpp | 337 -------- .../src/Apache/Qpid/Interop/MessageBodyStream.h | 131 ---- qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp | 251 ------ qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h | 125 --- qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp | 255 ------ qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h | 75 -- qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp | 304 -------- qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h | 89 --- qpid/wcf/src/Apache/Qpid/Interop/QpidException.h | 37 - qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h | 53 -- qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp | 525 ------------- qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h | 96 --- qpid/wcf/src/wcfnet.snk | Bin 596 -> 0 bytes 64 files changed, 11656 deletions(-) delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs delete mode 100755 qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs delete mode 100644 qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/InputLink.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/QpidException.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp delete mode 100644 qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h delete mode 100644 qpid/wcf/src/wcfnet.snk (limited to 'qpid/wcf/src') diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs deleted file mode 100644 index 980ae78361..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - public class AmqpBoolean : AmqpType - { - bool value; - - public AmqpBoolean(bool i) - { - this.value = i; - } - - public override void Encode(byte[] bufer, int offset, int count) - { - throw new NotImplementedException(); - } - - public override int EncodedSize - { - get { throw new NotImplementedException(); } - } - - public override AmqpType Clone() - { - return new AmqpBoolean(this.value); - } - - public bool Value - { - get { return this.value; } - set { this.value = value; } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs deleted file mode 100644 index c114e98a71..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - public class AmqpInt : AmqpType - { - int value; - - public AmqpInt(int i) - { - this.value = i; - } - - public override void Encode(byte[] bufer, int offset, int count) - { - throw new NotImplementedException(); - } - - public override int EncodedSize - { - get { throw new NotImplementedException(); } - } - - public override AmqpType Clone() - { - return new AmqpInt(this.value); - } - - public int Value - { - get { return this.value; } - set { this.value = value; } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs deleted file mode 100644 index 4099571fe0..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs +++ /dev/null @@ -1,301 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - public class AmqpProperties - { - // AMQP 0-10 delivery properties - private bool durable; - private Nullable timeToLive; - private string subject; - - // AMQP 0-10 message properties - private string replyToExchange; - private string replyToRoutingKey; - private byte[] userId; - private byte[] correlationId; - private string contentType; - - // for application and vendor properties - Dictionary propertyMap; - - public AmqpProperties() - { - } - - // AMQP 0-10 "message.delivery-properties - internal bool HasDeliveryProperties - { - get - { - return ((this.subject != null) || this.durable || this.timeToLive.HasValue); - } - } - - internal bool HasMappedProperties - { - get - { - if (this.propertyMap != null) - { - if (this.propertyMap.Count > 0) - { - return true; - } - } - - return false; - } - } - - // AMQP 0-10 "message.message-properties" - internal bool HasMessageProperties - { - get - { - if ((this.replyToExchange != null) || - (this.replyToRoutingKey != null) || - (this.userId != null) || - (this.correlationId != null) || - (this.contentType != null)) - { - return true; - } - - if (this.propertyMap == null) - { - return false; - } - - return (this.propertyMap.Count != 0); - } - } - - public Dictionary PropertyMap - { - get - { - if (this.propertyMap == null) - { - this.propertyMap = new Dictionary(); - } - return propertyMap; - } - set { this.propertyMap = value; } - } - - internal bool Empty - { - get - { - if (this.HasDeliveryProperties || this.HasMessageProperties) - { - return true; - } - return false; - } - } - - public string ContentType - { - get { return contentType; } - // TODO: validate - set { contentType = value; } - } - - public byte[] CorrelationId - { - get { return correlationId; } - set - { - if (value != null) - { - if (value.Length > 65535) - { - throw new ArgumentException("CorrelationId too big"); - } - } - correlationId = value; - } - } - - public byte[] UserId - { - get { return userId; } - set - { - if (value != null) - { - if (value.Length > 65535) - { - throw new ArgumentException("UserId too big"); - } - } - userId = value; - } - } - - public TimeSpan? TimeToLive - { - get { return this.timeToLive; } - set { this.timeToLive = value; } - } - - /// - /// Obsolete: switch to AMQP 1.0 "Subject" naming - /// - public string RoutingKey - { - get { return this.subject; } - set { this.subject = value; } - } - - public string Subject - { - get { return this.subject; } - set { this.subject = value; } - } - - public string ReplyToExchange - { - get { return this.replyToExchange; } - } - - public string ReplyToRoutingKey - { - get { return this.replyToRoutingKey; } - } - - // this changes from 0-10 to 1.0 - public void SetReplyTo(string exchange, string routingKey) - { - if ((exchange == null && routingKey == null)) - { - throw new ArgumentNullException("SetReplyTo"); - } - - this.replyToExchange = exchange; - this.replyToRoutingKey = routingKey; - } - - public bool Durable - { - get { return durable; } - set { durable = value; } - } - - public void Clear() - { - this.timeToLive = null; - this.subject = null; - this.replyToRoutingKey = null; - this.replyToExchange = null; - this.durable = false; - this.contentType = null; - this.userId = null; - this.correlationId = null; - this.propertyMap = null; - } - - public AmqpProperties Clone() - { - // memberwise clone ok for string, byte[], and value types - AmqpProperties clonedProps = (AmqpProperties)this.MemberwiseClone(); - - // deeper copy for the dictionary - if (this.propertyMap != null) - { - if (this.propertyMap.Count > 0) - { - Dictionary clonedDictionary = new Dictionary(this.propertyMap.Count); - foreach (KeyValuePair original in this.propertyMap) - { - clonedDictionary.Add(original.Key, original.Value.Clone()); - } - - clonedProps.propertyMap = clonedDictionary; - } - else - { - clonedProps.propertyMap = null; - } - } - return clonedProps; - } - - // adds/replaces from the other AmqpProperty object. - // just inserts references, i.e. provides shallow copy semantics (see Clone for deep copy) - public void MergeFrom(AmqpProperties other) - { - if (other.timeToLive.HasValue) - { - this.timeToLive = other.timeToLive; - } - - if ((other.replyToRoutingKey != null) || (other.replyToExchange != null)) - { - this.replyToExchange = other.replyToExchange; - this.replyToRoutingKey = other.replyToRoutingKey; - } - - if (other.subject != null) - { - this.subject = other.subject; - } - - if (other.durable) - { - this.durable = true; - } - - if (other.contentType != null) - { - this.contentType = other.contentType; - } - - if (other.correlationId != null) - { - this.correlationId = other.correlationId; - } - - if (other.userId != null) - { - this.userId = other.userId; - } - - if (other.propertyMap != null) - { - if (other.propertyMap.Count > 0) - { - Dictionary thisMap = this.PropertyMap; - foreach (KeyValuePair kvp in other.propertyMap) - { - thisMap[kvp.Key] = kvp.Value; - } - } - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs deleted file mode 100644 index 87cebe878c..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs +++ /dev/null @@ -1,91 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - // for big strings: str16 in 0-10 and str32 in 1.0 - - public class AmqpString : AmqpType - { - string value; - Encoding encoding; - - public AmqpString(string s) - { - this.value = s; - this.encoding = Encoding.UTF8; - } - - public AmqpString(string s, Encoding enc) - { - ValidateEncoding(enc); - this.value = s; - this.encoding = enc; - } - - public Encoding Encoding - { - get { return encoding; } - set - { - ValidateEncoding(value); - encoding = value; - } - } - - private void ValidateEncoding(Encoding enc) - { - if (value == null) - { - throw new ArgumentNullException("Encoding"); - } - - if ((enc != Encoding.UTF8) && (enc != Encoding.Unicode)) - { - throw new ArgumentException("Encoding not one of UTF8 or Unicode"); - } - } - - public override void Encode(byte[] bufer, int offset, int count) - { - throw new NotImplementedException(); - } - - public override int EncodedSize - { - get { throw new NotImplementedException(); } - } - - public override AmqpType Clone() - { - return new AmqpString(this.value); - } - - public string Value - { - get { return this.value; } - set { this.value = value; } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs deleted file mode 100644 index 8cd3ac9e4a..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs +++ /dev/null @@ -1,33 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - public abstract class AmqpType - { - public abstract void Encode(byte[] bufer, int offset, int count); - public abstract int EncodedSize { get; } - public abstract AmqpType Clone(); - } -} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj deleted file mode 100644 index 2a544cf6d0..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj +++ /dev/null @@ -1,158 +0,0 @@ - - - - - Debug - AnyCPU - 9.0.21022 - 2.0 - {820BFC34-A40F-46BA-B86B-05334854CA17} - Library - Properties - Apache.Qpid.AmqpTypes - Apache.Qpid.AmqpTypes - v3.5 - 512 - OnBuildSuccess - true - ..\..\..\wcfnet.snk - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - - - - - 3.5 - - - 3.5 - - - 3.5 - - - - - - - - - - - - - - - - - - - - - cd "$(ProjectDir)bin\$(ConfigurationName)" -del $(AssemblyName).dll -del $(AssemblyName).pdb -cd "$(ProjectDir)obj\$(ConfigurationName)" -del $(AssemblyName).dll -del $(AssemblyName).pdb -cd "$(ProjectDir)" -CreateNetModule.bat $(ConfigurationName) - - \ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs deleted file mode 100644 index 5ec8a732cf..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - public class AmqpUbyte : AmqpType - { - byte value; - - public AmqpUbyte(byte i) - { - this.value = i; - } - - public override void Encode(byte[] bufer, int offset, int count) - { - throw new NotImplementedException(); - } - - public override int EncodedSize - { - get { throw new NotImplementedException(); } - } - - public override AmqpType Clone() - { - return new AmqpUbyte(this.value); - } - - public byte Value - { - get { return this.value; } - set { this.value = value; } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat b/qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat deleted file mode 100755 index d67e2119f9..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat +++ /dev/null @@ -1,33 +0,0 @@ - -REM Licensed to the Apache Software Foundation (ASF) under one -REM or more contributor license agreements. See the NOTICE file -REM distributed with this work for additional information -REM regarding copyright ownership. The ASF licenses this file -REM to you under the Apache License, Version 2.0 (the -REM "License"); you may not use this file except in compliance -REM with the License. You may obtain a copy of the License at -REM -REM http://www.apache.org/licenses/LICENSE-2.0 -REM -REM Unless required by applicable law or agreed to in writing, -REM software distributed under the License is distributed on an -REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -REM KIND, either express or implied. See the License for the -REM specific language governing permissions and limitations -REM under the License. - -if %1 == Release goto release - -echo generating Debug netmodule - -%systemroot%\Microsoft.NET\Framework\v3.5\Csc.exe /noconfig /nowarn:1701,1702 /errorreport:prompt /warn:4 /define:DEBUG;TRACE /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Core.dll" /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Data.DataSetExtensions.dll" /reference:%systemroot%\Microsoft.NET\Framework\v2.0.50727\System.Data.dll /reference:%systemroot%\Microsoft.NET\Framework\v2.0.50727\System.dll /reference:%systemroot%\Microsoft.NET\Framework\v2.0.50727\System.Xml.dll /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Xml.Linq.dll" /debug+ /debug:full /filealign:512 /optimize- /out:obj\Debug\Apache.Qpid.AmqpTypes.netmodule /target:module *.cs - -goto end - -:release - -echo generating Release netmodule - -%systemroot%\Microsoft.NET\Framework\v3.5\Csc.exe /noconfig /nowarn:1701,1702 /errorreport:prompt /warn:4 /define:TRACE /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Core.dll" /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Data.DataSetExtensions.dll" /reference:C:\Windows\Microsoft.NET\Framework\v2.0.50727\System.Data.dll /reference:C:\Windows\Microsoft.NET\Framework\v2.0.50727\System.dll /reference:C:\Windows\Microsoft.NET\Framework\v2.0.50727\System.Xml.dll /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Xml.Linq.dll" /debug:pdbonly /filealign:512 /keyfile:..\..\..\wcfnet.snk /optimize+ /out:obj\Release\Apache.Qpid.AmqpTypes.netmodule /target:module *.cs - -:end \ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs deleted file mode 100644 index dffaee0d0d..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,55 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -[assembly: AssemblyTitle("Apache.Qpid.AmqpTypes")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("")] -[assembly: AssemblyCopyright("")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to true makes the types in this assembly visible -// to COM components. This is required for this to be used by an -// Excel RTD component. -[assembly: ComVisible(true)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM -[assembly: Guid("79b8b5d9-047d-4f3b-8610-7fe112ce6416")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the values or you can default the Build and Revision Numbers -// by using the '*' as shown below: -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs deleted file mode 100644 index b80f8b9e9e..0000000000 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs +++ /dev/null @@ -1,35 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.AmqpTypes -{ - using System; - using System.IO; - using System.Collections.Generic; - using System.Text; - - public sealed class PropertyName - { - public const string Priority = "amqpx.priority"; - public const string ContentType = "amqp.content-type"; - public const string ReplyTo = "amqp.reply-to"; - public const string ReplyToExchange = "amqpx.qpid0-10.reply-to-exchange"; - public const string RoutingKey = "amqpx.qpid0-10.routing-key"; - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs deleted file mode 100644 index d533fc212e..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs +++ /dev/null @@ -1,68 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Configuration; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Configuration; - - using Apache.Qpid.AmqpTypes; - - public class AmqpBinaryBinding : AmqpBinding - { - public AmqpBinaryBinding() -: base (new RawMessageEncodingBindingElement()) - { - } - - public AmqpBinaryBinding(string configurationName) - : this() - { - ApplyConfiguration(configurationName); - } - - private void ApplyConfiguration(string configurationName) - { - BindingsSection wcfBindings = (BindingsSection)ConfigurationManager.GetSection("system.serviceModel/bindings"); - // wcfBindings contains system defined bindings and bindingExtensions - - AmqpBinaryBindingCollectionElement section = (AmqpBinaryBindingCollectionElement)wcfBindings["amqpBinaryBinding"]; - if (section == null) - { - throw new ConfigurationErrorsException("Missing \"amqpBinaryBinding\" configuration section."); - } - - AmqpBinaryBindingConfigurationElement element = section.Bindings[configurationName]; - if (element == null) - { - throw new ConfigurationErrorsException(string.Format(System.Globalization.CultureInfo.CurrentCulture, - "There is no binding named {0} at {1}.", configurationName, section.BindingName)); - } - else - { - element.ApplyConfiguration(this); - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs deleted file mode 100644 index de263bc4ef..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs +++ /dev/null @@ -1,29 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - /// - /// Implement application configuration of bindingExtensions for AmqpBinaryBinding - /// - public class AmqpBinaryBindingCollectionElement - : System.ServiceModel.Configuration.StandardBindingCollectionElement - { - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs deleted file mode 100644 index a537a6c6c3..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs +++ /dev/null @@ -1,79 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Configuration; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Configuration; - using Apache.Qpid.AmqpTypes; - - public class AmqpBinaryBindingConfigurationElement : AmqpBindingConfigurationElement - { - public AmqpBinaryBindingConfigurationElement(string configurationName) - : base(configurationName) - { - } - - public AmqpBinaryBindingConfigurationElement() - : this(null) - { - } - - protected override Type BindingElementType - { - get { return typeof(AmqpBinaryBinding); } - } - - protected override ConfigurationPropertyCollection Properties - { - get - { - ConfigurationPropertyCollection properties = base.Properties; - - return properties; - } - } - - protected override void InitializeFrom(Binding binding) - { - base.InitializeFrom(binding); - AmqpBinaryBinding amqpBinding = (AmqpBinaryBinding)binding; - } - - protected override void OnApplyConfiguration(Binding binding) - { - if (binding == null) - throw new ArgumentNullException("binding"); - - if (binding.GetType() != typeof(AmqpBinaryBinding)) - { - throw new ArgumentException(string.Format("Invalid type for configuring an AMQP binding. Expected type: {0}. Type passed in: {1}.", - typeof(AmqpBinaryBinding).AssemblyQualifiedName, - binding.GetType().AssemblyQualifiedName)); - } - - base.OnApplyConfiguration(binding); - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs deleted file mode 100644 index be54f06b2f..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs +++ /dev/null @@ -1,153 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Configuration; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Configuration; - - using Apache.Qpid.AmqpTypes; - - public class AmqpBinding : Binding - { - protected AmqpTransportBindingElement transport; - protected MessageEncodingBindingElement encoding; - protected AmqpSecurity security; - - public AmqpBinding() - : this(new BinaryMessageEncodingBindingElement()) - { - } - - protected AmqpBinding(MessageEncodingBindingElement encoding) - { - this.encoding = encoding; - transport = new AmqpTransportBindingElement(); - } - - public AmqpBinding(string configurationName) - : this() - { - ApplyConfiguration(configurationName); - } - - public string BrokerHost - { - get { return transport.BrokerHost; } - set { transport.BrokerHost = value; } - } - - public int BrokerPort - { - get { return transport.BrokerPort; } - set { transport.BrokerPort = value; } - } - - public int PrefetchLimit - { - get { return transport.PrefetchLimit; } - set { transport.PrefetchLimit = value; } - } - - public AmqpSecurity Security - { - get - { - if (security == null) - { - if (transport.ChannelProperties.AmqpTransportSecurity == null) - { - transport.ChannelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); - } - - security = new AmqpSecurity(transport.ChannelProperties.AmqpTransportSecurity); - transport.BindingSecurity = security; - } - - return security; - } - } - - internal bool SecurityEnabled - { - get { return (transport.ChannelProperties.AmqpSecurityMode != AmqpSecurityMode.None); } - } - - public bool Shared - { - get { return transport.Shared; } - set { transport.Shared = value; } - } - - public TransferMode TransferMode - { - get { return transport.TransferMode; } - set { transport.TransferMode = value; } - } - - public AmqpProperties DefaultMessageProperties - { - get { return transport.DefaultMessageProperties; } - set { transport.DefaultMessageProperties = value; } - } - - public override string Scheme - { - get { return AmqpConstants.Scheme; } - } - - public override BindingElementCollection CreateBindingElements() - { - BindingElementCollection bindingElements = new BindingElementCollection(); - - bindingElements.Add(encoding); - bindingElements.Add(transport); - - return bindingElements.Clone(); - } - - private void ApplyConfiguration(string configurationName) - { - BindingsSection wcfBindings = (BindingsSection)ConfigurationManager.GetSection("system.serviceModel/bindings"); - // wcfBindings contains system defined bindings and bindingExtensions - - AmqpBindingCollectionElement section = (AmqpBindingCollectionElement)wcfBindings["amqpBinding"]; - if (section == null) - { - throw new ConfigurationErrorsException("Missing \"amqpBinding\" configuration section."); - } - - AmqpBindingConfigurationElement element = section.Bindings[configurationName]; - if (element == null) - { - throw new ConfigurationErrorsException(string.Format(System.Globalization.CultureInfo.CurrentCulture, - "There is no binding named {0} at {1}.", configurationName, section.BindingName)); - } - else - { - element.ApplyConfiguration(this); - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs deleted file mode 100644 index e8d3b6fad4..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs +++ /dev/null @@ -1,29 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - /// - /// Implement application configuration of bindingExtensions for AmqpBinding - /// - public class AmqpBindingCollectionElement - : System.ServiceModel.Configuration.StandardBindingCollectionElement - { - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs deleted file mode 100644 index edc91e67c1..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs +++ /dev/null @@ -1,344 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Configuration; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Configuration; - using System.Threading; - using Apache.Qpid.AmqpTypes; - - public class AmqpBindingConfigurationElement : StandardBindingElement - { - // not regular config elements. See PostDeserialize - string brokerHost; - int brokerPort; - - public AmqpBindingConfigurationElement(string configurationName) - : base(configurationName) - { - brokerHost = AmqpDefaults.BrokerHost; - brokerPort = AmqpDefaults.BrokerPort; - } - - public AmqpBindingConfigurationElement() - : this(null) - { - } - - protected override Type BindingElementType - { - get { return typeof(AmqpBinding); } - } - - public string BrokerHost - { - get { return brokerHost; } - set { brokerHost = value; } - } - - public int BrokerPort - { - get { return brokerPort; } - set { brokerPort = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)] - public int PrefetchLimit - { - get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; } - set { base[AmqpConfigurationStrings.PrefetchLimit] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)] - public bool Shared - { - get { return (bool)base[AmqpConfigurationStrings.Shared]; } - set { base[AmqpConfigurationStrings.Shared] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.TransferMode, DefaultValue = AmqpDefaults.TransferMode)] - public TransferMode TransferMode - { - get { return (TransferMode)base[AmqpConfigurationStrings.TransferMode]; } - set { base[AmqpConfigurationStrings.TransferMode] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.Brokers)] - public BrokerCollection Brokers - { - get - { - return (BrokerCollection)base[AmqpConfigurationStrings.Brokers]; - } - set - { - base[AmqpConfigurationStrings.Brokers] = value; - } - } - - [ConfigurationProperty(AmqpConfigurationStrings.Security)] - public AmqpSecurityElement Security - { - get { return (AmqpSecurityElement)base[AmqpConfigurationStrings.Security]; } - set { base[AmqpConfigurationStrings.Security] = value; } - } - - protected override ConfigurationPropertyCollection Properties - { - get - { - ConfigurationPropertyCollection properties = base.Properties; - properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, - typeof(int), 0, null, null, ConfigurationPropertyOptions.None)); - properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared, - typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); - properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode, - typeof(TransferMode), AmqpDefaults.TransferMode, null, null, ConfigurationPropertyOptions.None)); - properties.Add(new ConfigurationProperty("brokers", typeof(BrokerCollection), null)); - properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Security, typeof(AmqpSecurityElement), null)); - return properties; - } - } - - protected override void InitializeFrom(Binding binding) - { - base.InitializeFrom(binding); - AmqpBinding amqpBinding = (AmqpBinding)binding; - this.BrokerHost = amqpBinding.BrokerHost; - this.BrokerPort = amqpBinding.BrokerPort; - this.TransferMode = amqpBinding.TransferMode; - this.Shared = amqpBinding.Shared; - this.PrefetchLimit = amqpBinding.PrefetchLimit; - - if (!amqpBinding.SecurityEnabled) - { - this.Security = null; - } - else - { - if (this.Security == null) - { - this.Security = new AmqpSecurityElement(); - } - - AmqpTransportSecurity sec = amqpBinding.Security.Transport; - this.Security.Mode = AmqpSecurityMode.Transport; - if (this.Security.Transport == null) - { - this.Security.Transport = new AmqpTransportSecurityElement(); - } - - this.Security.Transport.CredentialType = sec.CredentialType; - this.Security.Transport.IgnoreEndpointCredentials = sec.IgnoreEndpointClientCredentials; - this.Security.Transport.UseSSL = sec.UseSSL; - if (sec.DefaultCredential == null) - { - - this.Security.Transport.DefaultCredential = null; - } - else - { - this.Security.Transport.DefaultCredential = new AmqpCredentialElement(); - this.Security.Transport.DefaultCredential.UserName = sec.DefaultCredential.UserName; - this.Security.Transport.DefaultCredential.Password = sec.DefaultCredential.Password; - } - } - - AmqpProperties props = amqpBinding.DefaultMessageProperties; - } - - protected override void OnApplyConfiguration(Binding binding) - { - if (binding == null) - throw new ArgumentNullException("binding"); - - if (!(binding is AmqpBinding)) - { - throw new ArgumentException(string.Format("Invalid type for configuring an AMQP binding. Expected type: {0}. Type passed in: {1}.", - typeof(AmqpBinding).AssemblyQualifiedName, - binding.GetType().AssemblyQualifiedName)); - } - - AmqpBinding amqpBinding = (AmqpBinding)binding; - amqpBinding.BrokerHost = this.BrokerHost; - amqpBinding.BrokerPort = this.BrokerPort; - amqpBinding.TransferMode = this.TransferMode; - amqpBinding.Shared = this.Shared; - amqpBinding.PrefetchLimit = this.PrefetchLimit; - - AmqpSecurityMode mode = AmqpSecurityMode.None; - if (this.Security != null) - { - mode = this.Security.Mode; - } - - if (mode == AmqpSecurityMode.None) - { - if (amqpBinding.SecurityEnabled) - { - amqpBinding.Security.Mode = AmqpSecurityMode.None; - } - } - else - { - amqpBinding.Security.Mode = AmqpSecurityMode.Transport; - amqpBinding.Security.Transport.CredentialType = this.Security.Transport.CredentialType; - amqpBinding.Security.Transport.IgnoreEndpointClientCredentials = this.Security.Transport.IgnoreEndpointCredentials; - amqpBinding.Security.Transport.UseSSL = this.Security.Transport.UseSSL; - if (this.Security.Transport.DefaultCredential != null) - { - amqpBinding.Security.Transport.DefaultCredential = new AmqpCredential( - this.Security.Transport.DefaultCredential.UserName, - this.Security.Transport.DefaultCredential.Password); - } - else - { - amqpBinding.Security.Transport.DefaultCredential = null; - } - } - } - - - protected override void PostDeserialize() - { - base.PostDeserialize(); - - BrokerCollection brokers = Brokers; - if (brokers != null) - { - if (brokers.Count > 0) - { - // just grab the first element until failover is supported - System.Collections.IEnumerator brokersEnum = brokers.GetEnumerator(); - // move to first element - brokersEnum.MoveNext(); - BrokerElement be = (BrokerElement)brokersEnum.Current; - this.BrokerHost = be.Host; - this.BrokerPort = be.Port; - } - } - } - } - - public class BrokerCollection : ConfigurationElementCollection - { - public BrokerCollection() - { - //this.AddElementName = "broker"; - } - - protected override ConfigurationElement CreateNewElement() - { - return new BrokerElement(); - } - - protected override void BaseAdd(ConfigurationElement element) - { - BrokerElement be = (BrokerElement)element; - if (this.BaseGet((Object)be.Key) != null) - { - throw new ConfigurationErrorsException("duplicate broker definition at line " + element.ElementInformation.LineNumber); - } - base.BaseAdd(element); - } - - protected override Object GetElementKey(ConfigurationElement element) - { - BrokerElement be = (BrokerElement) element; - return be.Key; - } - - protected override void PostDeserialize() - { - base.PostDeserialize(); - if (this.Count == 0) - { - throw new ArgumentException("Brokers collection requires at least one broker"); - } - if (this.Count > 1) - { - Console.WriteLine("Warning: multiple brokers not supported, selecting first instance"); - } - BrokerElement be = (BrokerElement)this.BaseGet(0); - } - - protected override string ElementName - { - get - { - return "broker"; - } - } - - public override ConfigurationElementCollectionType CollectionType - { - get - { - return ConfigurationElementCollectionType.BasicMap; - } - } - } - - public class BrokerElement : ConfigurationElement - { - string key; - - public BrokerElement() - { - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.BrokerHost, - typeof(string), AmqpDefaults.BrokerHost, null, null, ConfigurationPropertyOptions.None)); - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.BrokerPort, - typeof(int), AmqpDefaults.BrokerPort, null, null, ConfigurationPropertyOptions.None)); - - } - - [ConfigurationProperty(AmqpConfigurationStrings.BrokerHost, DefaultValue = AmqpDefaults.BrokerHost)] - public string Host - { - get { return (string)base[AmqpConfigurationStrings.BrokerHost]; } - set { base[AmqpConfigurationStrings.BrokerHost] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.BrokerPort, DefaultValue = AmqpDefaults.BrokerPort)] - public int Port - { - get { return (int)base[AmqpConfigurationStrings.BrokerPort]; } - set { base[AmqpConfigurationStrings.BrokerPort] = value; } - } - - public string Key - { - get - { - if (this.key == null) - { - this.key = this.Host + ':' + this.Port; - } - return this.key; - } - } - - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs deleted file mode 100644 index 9b27b00994..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs +++ /dev/null @@ -1,154 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Description; - using System.Collections.Generic; - using System.Collections.ObjectModel; - - class AmqpChannelFactory : ChannelFactoryBase - { - MessageEncoderFactory messageEncoderFactory; - AmqpTransportBindingElement bindingElement; - AmqpChannelProperties channelProperties; - long maxBufferPoolSize; - bool shared; - int prefetchLimit; - BindingContext bindingContext; - List openChannels; - - internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) - : base(context.Binding) - { - this.bindingElement = bindingElement; - this.bindingContext = context; - this.channelProperties = bindingElement.ChannelProperties.Clone(); - this.shared = bindingElement.Shared; - this.prefetchLimit = bindingElement.PrefetchLimit; - this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; - Collection messageEncoderBindingElements - = context.BindingParameters.FindAll(); - - if (messageEncoderBindingElements.Count > 1) - { - throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); - } - else if (messageEncoderBindingElements.Count == 1) - { - this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); - } - else - { - this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); - } - - openChannels = new List(); - } - - - public override T GetProperty() - { - T mep = messageEncoderFactory.Encoder.GetProperty(); - if (mep != null) - { - return mep; - } - - if (typeof(T) == typeof(MessageVersion)) - { - return (T)(object)messageEncoderFactory.Encoder.MessageVersion; - } - - return base.GetProperty(); - } - - protected override void OnOpen(TimeSpan timeout) - { - // check and freeze security properties now - AmqpSecurityMode mode = AmqpSecurityMode.None; - if (this.bindingElement.BindingSecurity != null) - { - mode = bindingElement.BindingSecurity.Mode; - } - - this.channelProperties.AmqpSecurityMode = mode; - if (mode == AmqpSecurityMode.None) - { - return; - } - - AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); - } - - protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) - { - throw new NotImplementedException("AmqpChannelFactory OnBeginOpen"); - //// return null; - } - - protected override void OnEndOpen(IAsyncResult result) - { - throw new NotImplementedException("AmqpChannelFactory OnEndOpen"); - } - - protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) - { - AmqpTransportChannel channel = new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); - lock (openChannels) - { - channel.Closed += new EventHandler(channel_Closed); - openChannels.Add(channel); - } - return (TChannel)(object) channel; - } - - void channel_Closed(object sender, EventArgs e) - { - if (this.State != CommunicationState.Opened) - { - return; - } - - lock (openChannels) - { - AmqpTransportChannel channel = (AmqpTransportChannel)sender; - if (openChannels.Contains(channel)) - { - openChannels.Remove(channel); - } - } - } - - protected override void OnClose(TimeSpan timeout) - { - base.OnClose(timeout); - lock (openChannels) - { - foreach (AmqpTransportChannel channel in openChannels) - { - channel.Close(timeout); - } - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs deleted file mode 100644 index b431689c4d..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs +++ /dev/null @@ -1,234 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Net; - using System.Net.Sockets; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Description; - using System.Globalization; - - using Apache.Qpid.AmqpTypes; - - /// - /// Collection of constants used by the Amqp Channel classes - /// - static class AmqpConstants - { - internal const string Scheme = "amqp"; - internal const string AmqpBindingSectionName = "system.serviceModel/bindings/amqpBinding"; - internal const string AmqpBinaryBindingSectionName = "system.serviceModel/bindings/amqpBinaryBinding"; - internal const string AmqpTransportSectionName = "amqpTransport"; - } - - static class AmqpConfigurationStrings - { - public const string BrokerHost = "host"; - public const string BrokerPort = "port"; - public const string TransferMode = "transferMode"; - public const string Brokers = "brokers"; - public const string Shared = "shared"; - public const string PrefetchLimit = "prefetchLimit"; - public const string MaxBufferPoolSize = "maxBufferPoolSize"; - public const string MaxReceivedMessageSize = "maxReceivedMessageSize"; - public const string Security = "security"; - public const string SecurityMode = "mode"; - public const string SecurityTransport = "transport"; - public const string SecurityTransportCredentialType = "credentialType"; - public const string SecurityTransportUseSSL = "useSSL"; - public const string SecurityTransportDefaultCredential = "defaultCredential"; - public const string SecurityTransportIgnoreEndpointCredentials = "ignoreEndpointCredentials"; - public const string CredentialUserName = "userName"; - public const string CredentialPassword = "password"; - } - - static class AmqpDefaults - { - internal const string BrokerHost = "localhost"; - internal const int BrokerPort = 5672; - internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered; - internal const long MaxBufferPoolSize = 64 * 1024; - internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024; - } - - // parking spot for properties that may be shared by separate channels on a single AMQP connection - internal class AmqpChannelProperties - { - string brokerHost; - int brokerPort; - TransferMode transferMode; - AmqpProperties defaultMessageProperties; - AmqpSecurityMode amqpSecurityMode; - AmqpTransportSecurity amqpTransportSecurity; - AmqpCredential amqpCredential; - long maxBufferPoolSize; - int maxReceivedMessageSize; - - internal AmqpChannelProperties() - { - this.brokerHost = AmqpDefaults.BrokerHost; - this.brokerPort = AmqpDefaults.BrokerPort; - this.transferMode = AmqpDefaults.TransferMode; - this.defaultMessageProperties = null; - this.amqpSecurityMode = AmqpSecurityMode.None; - this.amqpTransportSecurity = null; - this.amqpCredential = null; - this.maxBufferPoolSize = AmqpDefaults.MaxBufferPoolSize; - this.maxReceivedMessageSize = AmqpDefaults.MaxReceivedMessageSize; - } - - public AmqpChannelProperties Clone() - { - AmqpChannelProperties props = (AmqpChannelProperties) this.MemberwiseClone(); - if (this.defaultMessageProperties != null) - { - props.defaultMessageProperties = this.defaultMessageProperties.Clone(); - } - - if (this.amqpTransportSecurity != null) - { - props.amqpTransportSecurity = this.amqpTransportSecurity.Clone(); - } - - if (this.amqpCredential != null) - { - this.amqpCredential = this.amqpCredential.Clone(); - } - - return props; - } - - internal string BrokerHost - { - get { return this.brokerHost; } - set { this.brokerHost = value; } - } - - internal int BrokerPort - { - get { return this.brokerPort; } - set { this.brokerPort = value; } - } - - internal TransferMode TransferMode - { - get { return this.transferMode; } - set { this.transferMode = value; } - } - - internal AmqpProperties DefaultMessageProperties - { - get { return this.defaultMessageProperties; } - set { this.defaultMessageProperties = value; } - } - - internal AmqpSecurityMode AmqpSecurityMode - { - get { return this.amqpSecurityMode; } - set { this.amqpSecurityMode = value; } - } - - internal AmqpTransportSecurity AmqpTransportSecurity - { - get { return this.amqpTransportSecurity; } - set { this.amqpTransportSecurity = value; } - } - - internal AmqpCredential AmqpCredential - { - get { return this.amqpCredential; } - set { this.amqpCredential = value; } - } - - internal long MaxBufferPoolSize - { - get { return this.maxBufferPoolSize; } - set { this.maxBufferPoolSize = value; } - } - - internal int MaxReceivedMessageSize - { - get { return this.maxReceivedMessageSize; } - set { this.maxReceivedMessageSize = value; } - } - } - - static class AmqpChannelHelpers - { - internal static void ValidateTimeout(TimeSpan timeout) - { - if (timeout < TimeSpan.Zero) - { - throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue."); - } - } - - internal static void FindAuthenticationCredentials(AmqpChannelProperties channelProperties, - BindingContext bindingContext) - { - AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; - if (tsec == null) - { - // no auth - return; - } - - if (tsec.CredentialType == AmqpCredentialType.Anonymous) - { - // no auth - return; - } - - // credentials search order: specific AmqpCredentials, specific - // ClientCredentials (if applicable), binding's default credentials - - AmqpCredential amqpCred = bindingContext.BindingParameters.Find(); - if (amqpCred != null) - { - channelProperties.AmqpCredential = amqpCred.Clone(); - return; - } - - if (!tsec.IgnoreEndpointClientCredentials) - { - ClientCredentials cliCred = bindingContext.BindingParameters.Find(); - if (cliCred != null) - { - if (cliCred.UserName != null) - { - if (cliCred.UserName.UserName != null) - { - channelProperties.AmqpCredential = new AmqpCredential(cliCred.UserName.UserName, - cliCred.UserName.Password); - return; - } - } - } - } - - if (tsec.DefaultCredential != null) - { - channelProperties.AmqpCredential = tsec.DefaultCredential.Clone(); - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs deleted file mode 100644 index 78655f2124..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs +++ /dev/null @@ -1,204 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.Threading; - using System.Collections.Generic; - using System.Collections.ObjectModel; - - class AmqpChannelListener : ChannelListenerBase - { - MessageEncoderFactory messageEncoderFactory; - AmqpTransportBindingElement bindingElement; - AmqpChannelProperties channelProperties; - BindingContext bindingContext; - bool shared; - int prefetchLimit; - long maxBufferPoolSize; - Uri uri; - AmqpTransportChannel amqpTransportChannel; - delegate IInputChannel AsyncOnAcceptCaller (TimeSpan timeout); - AsyncOnAcceptCaller asyncOnAcceptCaller; - ManualResetEvent acceptWaitEvent; - - internal AmqpChannelListener(AmqpTransportBindingElement bindingElement, BindingContext context) - : base(context.Binding) - { - this.bindingElement = bindingElement; - this.channelProperties = bindingElement.ChannelProperties.Clone(); - this.bindingContext = context; - this.shared = bindingElement.Shared; - this.prefetchLimit = bindingElement.PrefetchLimit; - - this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; - - // TODO: review this. Should be unique hostname based - this.uri = context.ListenUriBaseAddress; - this.asyncOnAcceptCaller = new AsyncOnAcceptCaller(this.OnAcceptChannel); - this.acceptWaitEvent = new ManualResetEvent(false); - - Collection messageEncoderBindingElements - = context.BindingParameters.FindAll(); - - if(messageEncoderBindingElements.Count > 1) - { - throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); - } - else if (messageEncoderBindingElements.Count == 1) - { - this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); - } - else - { - this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); - } - } - - public override Uri Uri - { - get - { - return this.uri; - } - } - - - - public override T GetProperty() - { - T mep = messageEncoderFactory.Encoder.GetProperty(); - if (mep != null) - { - return mep; - } - - if (typeof(T) == typeof(MessageVersion)) - { - return (T)(object)messageEncoderFactory.Encoder.MessageVersion; - } - - return base.GetProperty(); - } - - protected override void OnOpen(TimeSpan timeout) - { - // check and freeze security properties now - AmqpSecurityMode mode = AmqpSecurityMode.None; - if (this.bindingElement.BindingSecurity != null) - { - mode = bindingElement.BindingSecurity.Mode; - } - - this.channelProperties.AmqpSecurityMode = mode; - if (mode == AmqpSecurityMode.None) - { - return; - } - - AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); - } - - protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) - { - throw new NotImplementedException("AmqpChannelListener OnBeginOpen"); - //// return null; - } - - protected override void OnEndOpen(IAsyncResult result) - { - throw new NotImplementedException("AmqpChannelListener OnEndOpen"); - } - - protected override bool OnWaitForChannel(TimeSpan timeout) - { - throw new NotImplementedException("AmqpChannelListener OnWaitForChannel"); - } - - protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) - { - throw new NotImplementedException("AmqpChannelListener OnBeginWaitForChannel"); - } - - protected override bool OnEndWaitForChannel(IAsyncResult result) - { - throw new NotImplementedException("AmqpChannelListener OnEndWaitForChannel"); - } - - protected override IInputChannel OnAcceptChannel(TimeSpan timeout) - { - if (this.IsDisposed) - { - return null; - } - - if (amqpTransportChannel == null) - { - // TODO: add timeout processing - amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, - new EndpointAddress(uri), messageEncoderFactory.Encoder, - maxBufferPoolSize, this.shared, this.prefetchLimit); - return (IInputChannel)(object)amqpTransportChannel; - } - - // Singleton channel. Subsequent Accepts wait until the listener is closed - acceptWaitEvent.WaitOne(); - return null; - } - - protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) - { - return asyncOnAcceptCaller.BeginInvoke(timeout, callback, state); - } - - protected override IInputChannel OnEndAcceptChannel(IAsyncResult result) - { - return asyncOnAcceptCaller.EndInvoke(result); - } - - protected override void OnClose(TimeSpan timeout) - { - if (amqpTransportChannel != null) - { - amqpTransportChannel.Close(); - } - acceptWaitEvent.Set(); - } - - protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) - { - throw new NotImplementedException("AmqpChannelListener OnBeginClose"); - } - - protected override void OnEndClose(IAsyncResult result) - { - throw new NotImplementedException("AmqpChannelListener OnEndClose"); - } - - protected override void OnAbort() - { - if (amqpTransportChannel != null) - amqpTransportChannel.Abort(); - acceptWaitEvent.Set(); - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs deleted file mode 100644 index e2da07c800..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredential.cs +++ /dev/null @@ -1,113 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -/* - * AMQP has a SASL authentication mechanism that doesn't match - * with existing .NET credentials. The analogy breaks down further - * if there is a list of brokers to cycle through on failover. - * This class will allow arbitrary credentials to be specified - * by the user, but is meant to be sensibly populated by bindings - * that use it from ClientCredentials. - * See the related interplay of ClientCredentials and - * WebProxy NetworkCredential for the BasicHttpBinding. - */ - -namespace Apache.Qpid.Channel -{ - using System; - - /// - /// Credentials for establishing a connection to an AMQP server. - /// - public class AmqpCredential - { - private string password; - private string userName; // SASL authentication id - // Future: private string the_Sasl_Authorization_ID - // Future: private X509CertificateInitiatorClientCredential tlsClientCertificate; - - public AmqpCredential(string userName, string password) - { - if (userName == null) - { - throw new ArgumentNullException("user name"); - } - - if (password == null) - { - throw new ArgumentNullException("password"); - } - - this.userName = userName; - this.password = password; - } - - public string UserName - { - get - { - if (this.userName == null) - { - this.userName = ""; - } - - return this.userName; - } - - set - { - if (value == null) - { - throw new ArgumentNullException("user name"); - } - - this.userName = value; - } - } - - public string Password - { - get - { - if (this.password == null) - { - this.password = ""; - } - - return this.password; - } - - set - { - if (value == null) - { - throw new ArgumentNullException("password"); - } - - this.password = value; - } - } - - public AmqpCredential Clone() - { - return (AmqpCredential) this.MemberwiseClone(); - } - } - -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs deleted file mode 100644 index 2bafbbb54e..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - /// - /// Enumerates the SASL authentication mechanisms used by the AMQP transport - /// - public enum AmqpCredentialType - { - /// - /// SASL ANONYMOUS mechanism - /// - Anonymous, - - /// - /// SASL PLAIN mechanism: username and password - /// - Plain - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs deleted file mode 100644 index 5d88afb88f..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs +++ /dev/null @@ -1,75 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - - /// - /// Specifies the types of trasport-level and message-level security used by - /// an endpoint configured with an AmqpBinding. - /// - public sealed class AmqpSecurity - { - private AmqpSecurityMode mode; - private AmqpTransportSecurity transportSecurity; - - internal AmqpSecurity() - { - this.mode = AmqpSecurityMode.None; - } - - internal AmqpSecurity(AmqpTransportSecurity tsec) - { - if (tsec == null) - { - throw new ArgumentNullException("AmqpTransportSecurity"); - } - - this.mode = AmqpSecurityMode.Transport; - this.transportSecurity = tsec; - } - - /// - /// gets or sets the security mode. - /// - public AmqpSecurityMode Mode - { - get { return this.mode; } - set {this.mode = value; } - } - - /// - /// gets the security object that controls encryption - /// and authentication parameters for the AMQP transport. - /// - public AmqpTransportSecurity Transport - { - get - { - if (this.transportSecurity == null) - { - this.transportSecurity = new AmqpTransportSecurity(); - } - - return this.transportSecurity; - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs deleted file mode 100644 index f7370e40f5..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityElement.cs +++ /dev/null @@ -1,126 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Configuration; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Configuration; - using Apache.Qpid.AmqpTypes; - - public sealed class AmqpSecurityElement : ConfigurationElement - { - public AmqpSecurityElement() - { - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityMode, - typeof(AmqpSecurityMode), AmqpSecurityMode.None, null, null, ConfigurationPropertyOptions.None)); - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransport, - typeof(AmqpTransportSecurityElement), null)); - - } - - [ConfigurationProperty(AmqpConfigurationStrings.SecurityMode, DefaultValue = AmqpSecurityMode.None)] - public AmqpSecurityMode Mode - { - get { return (AmqpSecurityMode)base[AmqpConfigurationStrings.SecurityMode]; } - set { base[AmqpConfigurationStrings.SecurityMode] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransport)] - public AmqpTransportSecurityElement Transport - { - get { return (AmqpTransportSecurityElement)base[AmqpConfigurationStrings.SecurityTransport]; } - set { base[AmqpConfigurationStrings.SecurityTransport] = value; } - } - } - - public class AmqpTransportSecurityElement : ConfigurationElement - { - public AmqpTransportSecurityElement() - { - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportCredentialType, - typeof(AmqpCredentialType), AmqpCredentialType.Anonymous, null, null, ConfigurationPropertyOptions.None)); - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportUseSSL, - typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportDefaultCredential, - typeof(AmqpCredentialElement), null)); - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials, - typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); - - } - - [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportCredentialType, DefaultValue = AmqpCredentialType.Anonymous)] - public AmqpCredentialType CredentialType - { - get { return (AmqpCredentialType)base[AmqpConfigurationStrings.SecurityTransportCredentialType]; } - set { base[AmqpConfigurationStrings.SecurityTransportCredentialType] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportUseSSL, DefaultValue = false)] - public bool UseSSL - { - get { return (bool)base[AmqpConfigurationStrings.SecurityTransportUseSSL]; } - set { base[AmqpConfigurationStrings.SecurityTransportUseSSL] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportDefaultCredential, DefaultValue = null)] - public AmqpCredentialElement DefaultCredential - { - get { return (AmqpCredentialElement)base[AmqpConfigurationStrings.SecurityTransportDefaultCredential]; } - set { base[AmqpConfigurationStrings.SecurityTransportDefaultCredential] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials, DefaultValue = false)] - public bool IgnoreEndpointCredentials - { - get { return (bool)base[AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials]; } - set { base[AmqpConfigurationStrings.SecurityTransportIgnoreEndpointCredentials] = value; } - } - } - - public class AmqpCredentialElement : ConfigurationElement - { - public AmqpCredentialElement() - { - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.CredentialUserName, - typeof(string), "", null, null, ConfigurationPropertyOptions.None)); - Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.CredentialPassword, - typeof(string), "", null, null, ConfigurationPropertyOptions.None)); - - } - - [ConfigurationProperty(AmqpConfigurationStrings.CredentialUserName, DefaultValue = "")] - public string UserName - { - get { return (string)base[AmqpConfigurationStrings.CredentialUserName]; } - set { base[AmqpConfigurationStrings.CredentialUserName] = value; } - } - - [ConfigurationProperty(AmqpConfigurationStrings.CredentialPassword, DefaultValue = "")] - public string Password - { - get { return (string)base[AmqpConfigurationStrings.CredentialPassword]; } - set { base[AmqpConfigurationStrings.CredentialPassword] = value; } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs deleted file mode 100644 index 88e7add054..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurityMode.cs +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - /// - /// Specifies whether trasport-level security is used with AMQP connections - /// - public enum AmqpSecurityMode - { - /// - /// Indicates no security is used with the AMQP transport - /// - None, - - /// - /// Indicates transport-level security is used with the AMQP transport - /// - Transport - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs deleted file mode 100644 index a98f361d19..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs +++ /dev/null @@ -1,186 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.ServiceModel.Description; - using Apache.Qpid.AmqpTypes; - - public class AmqpTransportBindingElement : TransportBindingElement, ITransactedBindingElement - { - AmqpChannelProperties channelProperties; - bool shared; - int prefetchLimit; - AmqpSecurity bindingSecurity; - - public AmqpTransportBindingElement() - { - // start with default properties - channelProperties = new AmqpChannelProperties(); - } - - protected AmqpTransportBindingElement(AmqpTransportBindingElement other) - : base(other) - { - this.channelProperties = other.channelProperties.Clone(); - this.shared = other.shared; - this.prefetchLimit = other.prefetchLimit; - this.bindingSecurity = other.bindingSecurity; - } - - internal AmqpSecurity BindingSecurity - { - get { return this.bindingSecurity; } - set { this.bindingSecurity = value; } - } - - public override IChannelFactory BuildChannelFactory(BindingContext context) - { - if (context == null) - { - throw new ArgumentNullException("context"); - } - - return (IChannelFactory)(object)new AmqpChannelFactory(this, context); - } - - public override IChannelListener BuildChannelListener(BindingContext context) - { - if (context == null) - { - throw new ArgumentNullException("context"); - } - - return (IChannelListener)(object)new AmqpChannelListener(this, context); - } - - - - public override bool CanBuildChannelFactory(BindingContext context) - { - return ((typeof(TChannel) == typeof(IOutputChannel)) || - (typeof(TChannel) == typeof(IInputChannel))); - } - - public override bool CanBuildChannelListener(BindingContext context) - { - return ((typeof(TChannel) == typeof(IInputChannel))); - } - - public override BindingElement Clone() - { - return new AmqpTransportBindingElement(this); - } - - internal AmqpChannelProperties ChannelProperties - { - get { return channelProperties; } - } - - public AmqpCredential AmqpCredential - { - get { return this.channelProperties.AmqpCredential; } - set { this.channelProperties.AmqpCredential = value; } - } - - public string BrokerHost - { - get { return this.channelProperties.BrokerHost; } - set { this.channelProperties.BrokerHost = value; } - } - - public int BrokerPort - { - get { return this.channelProperties.BrokerPort; } - set { this.channelProperties.BrokerPort = value; } - } - - public int PrefetchLimit - { - get { return this.prefetchLimit; } - set { this.prefetchLimit = value; } - } - - public bool Shared - { - get { return this.shared; } - set { this.shared = value; } - } - - public bool TransactedReceiveEnabled - { - get { return true; } - } - - public TransferMode TransferMode - { - get { return this.channelProperties.TransferMode; } - set { this.channelProperties.TransferMode = value; } - } - - public AmqpTransportSecurity TransportSecurity - { - get - { - if (this.channelProperties.AmqpTransportSecurity == null) - { - this.channelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); - } - - return this.channelProperties.AmqpTransportSecurity; - } - } - - - public AmqpProperties DefaultMessageProperties - { - get { return this.channelProperties.DefaultMessageProperties; } - - set { this.channelProperties.DefaultMessageProperties = value; } - } - - public override T GetProperty(BindingContext context) - { - if (context == null) - { - throw new ArgumentNullException("context"); - } - - if (typeof(T) == typeof(MessageVersion)) - { - return (T)(object)MessageVersion.Default; - } - - - return context.GetInnerProperty(); - } - - public override string Scheme - { - get - { - return AmqpConstants.Scheme; - } - } - - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs deleted file mode 100644 index 6f0ffd9815..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs +++ /dev/null @@ -1,642 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -// TODO: flow control -// timeout handling -// transactions -// check if should split into separate input and output classes (little overlap) - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections; - using System.Collections.Generic; - using System.ServiceModel; - using System.ServiceModel.Channels; - using System.Text; - using System.Threading; - using System.Globalization; - using System.Web; - using System.Xml; - - // the thin interop layer that provides access to the Qpid AMQP client libraries - using Apache.Qpid.Interop; - using Apache.Qpid.AmqpTypes; - - /// - /// WCF client transport channel for accessing AMQP brokers using the Qpid C++ library - /// - public class AmqpTransportChannel : ChannelBase, IOutputChannel, IInputChannel - { - private static readonly EndpointAddress AnonymousAddress = - new EndpointAddress("http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous"); - - private EndpointAddress remoteAddress; - private MessageEncoder encoder; - private AmqpChannelProperties factoryChannelProperties; - private bool shared; - private int prefetchLimit; - private string encoderContentType; - // AMQP subject/routing key - private string subject; - // Qpid addressing value for "qpid.subject" property - private string qpidSubject; - - private BufferManager bufferManager; - private AmqpProperties outputMessageProperties; - - private InputLink inputLink; - private OutputLink outputLink; - - private bool isInputChannel; - private bool streamed; - - private AsyncTimeSpanCaller asyncOpenCaller; - private AsyncTimeSpanCaller asyncCloseCaller; - - internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit) - : base(factory) - { - this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory); - - if (remoteAddress == null) - { - throw new ArgumentException("Null Endpoint Address"); - } - - this.factoryChannelProperties = channelProperties; - this.shared = sharedConnection; - this.prefetchLimit = prefetchLimit; - this.remoteAddress = remoteAddress; - - // pull out host, port, queue, and connection arguments - string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject); - - this.encoder = msgEncoder; - string ct = String.Empty; - if (this.encoder != null) - { - ct = this.encoder.ContentType; - if (ct != null) - { - int pos = ct.IndexOf(';'); - if (pos != -1) - { - ct = ct.Substring(0, pos).Trim(); - } - } - else - { - ct = "application/octet-stream"; - } - } - - this.encoderContentType = ct; - - if (this.factoryChannelProperties.TransferMode == TransferMode.Streamed) - { - this.streamed = true; - } - else - { - if (!(this.factoryChannelProperties.TransferMode == TransferMode.Buffered)) - { - throw new ArgumentException("TransferMode mode must be \"Streamed\" or \"Buffered\""); - } - - this.streamed = false; - } - - this.bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, int.MaxValue); - - this.asyncOpenCaller = new AsyncTimeSpanCaller(this.OnOpen); - this.asyncCloseCaller = new AsyncTimeSpanCaller(this.OnClose); - - if (this.isInputChannel) - { - this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress); - this.inputLink.PrefetchLimit = this.prefetchLimit; - } - else - { - this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress); - this.subject = this.outputLink.DefaultSubject; - this.qpidSubject = this.outputLink.QpidSubject; - } - } - - private delegate bool AsyncTryReceiveCaller(TimeSpan timeout, out Message message); - - private delegate void AsyncTimeSpanCaller(TimeSpan timeout); - - EndpointAddress IOutputChannel.RemoteAddress - { - get - { - return this.remoteAddress; - } - } - - // i.e what you would insert into a ReplyTo header to reach - // here. Presumably should be exchange/link and routing info, - // rather than the actual input queue name. - EndpointAddress IInputChannel.LocalAddress - { - get - { - // TODO: something better - return AnonymousAddress; - } - } - - AmqpProperties OutputMessageProperties - { - get - { - if (this.outputMessageProperties == null) - { - this.outputMessageProperties = this.factoryChannelProperties.DefaultMessageProperties; - if (this.outputMessageProperties == null) - { - this.outputMessageProperties = new AmqpProperties(); - } - } - - return this.outputMessageProperties; - } - } - - Uri IOutputChannel.Via - { - get - { - return this.remoteAddress.Uri; - } - } - - public override T GetProperty() - { - if (typeof(T) == typeof(IInputChannel)) - { - if (this.isInputChannel) - { - return (T)(object)this; - } - } - else if (typeof(T) == typeof(IOutputChannel)) - { - if (!this.isInputChannel) - { - return (T)(object)this; - } - } - - return base.GetProperty(); - } - - public void Send(Message message, TimeSpan timeout) - { - this.ThrowIfDisposedOrNotOpen(); - AmqpChannelHelpers.ValidateTimeout(timeout); - - try - { - using (AmqpMessage amqpMessage = this.WcfToQpid(message)) - { - this.outputLink.Send(amqpMessage, timeout); - } - } - finally - { - message.Close(); - } - } - - public void Send(Message message) - { - this.Send(message, this.DefaultSendTimeout); - } - - public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) - { - this.ThrowIfDisposedOrNotOpen(); - AmqpChannelHelpers.ValidateTimeout(timeout); - - try - { - using (AmqpMessage amqpMessage = this.WcfToQpid(message)) - { - return this.outputLink.BeginSend(amqpMessage, timeout, callback, state); - } - } - finally - { - message.Close(); - } - } - - public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) - { - return this.BeginSend(message, this.DefaultSendTimeout, callback, state); - } - - public void EndSend(IAsyncResult result) - { - this.outputLink.EndSend(result); - } - - public Message Receive(TimeSpan timeout) - { - Message message; - if (this.TryReceive(timeout, out message)) - { - return message; - } - else - { - throw new TimeoutException("Receive"); - } - } - - public Message Receive() - { - return this.Receive(this.DefaultReceiveTimeout); - } - - public bool TryReceive(TimeSpan timeout, out Message message) - { - AmqpMessage amqpMessage; - message = null; - - if (this.inputLink.TryReceive(timeout, out amqpMessage)) - { - message = this.QpidToWcf(amqpMessage); - return true; - } - - return false; - } - - public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) - { - return this.inputLink.BeginTryReceive(timeout, callback, state); - } - - public bool EndTryReceive(IAsyncResult result, out Message message) - { - AmqpMessage amqpMessage = null; - if (!this.inputLink.EndTryReceive(result, out amqpMessage)) - { - message = null; - return false; - } - message = QpidToWcf(amqpMessage); - return true; - } - - public bool WaitForMessage(TimeSpan timeout) - { - return this.inputLink.WaitForMessage(timeout); - } - - public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) - { - return this.inputLink.BeginTryReceive(timeout, callback, state); - } - - public IAsyncResult BeginReceive(AsyncCallback callback, object state) - { - return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); - } - - public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) - { - return this.inputLink.BeginWaitForMessage(timeout, callback, state); - } - - public Message EndReceive(IAsyncResult result) - { - Message message; - if (this.EndTryReceive(result, out message)) - { - return message; - } - else - { - throw new TimeoutException("EndReceive"); - } - } - - public bool EndWaitForMessage(IAsyncResult result) - { - return this.inputLink.EndWaitForMessage(result); - } - - public void CloseEndPoint() - { - if (this.inputLink != null) - { - this.inputLink.Close(); - } - if (this.outputLink != null) - { - this.outputLink.Close(); - } - } - - /// - /// Open connection to Broker - /// - protected override void OnOpen(TimeSpan timeout) - { - // TODO: move open logic to here from constructor - } - - protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) - { - return this.asyncOpenCaller.BeginInvoke(timeout, callback, state); - } - - protected override void OnEndOpen(IAsyncResult result) - { - this.asyncOpenCaller.EndInvoke(result); - } - - protected override void OnAbort() - { - //// TODO: check for network-less qpid teardown or launch special thread - this.CloseEndPoint(); - this.Cleanup(); - } - - /// - /// Shutdown gracefully - /// - protected override void OnClose(TimeSpan timeout) - { - this.CloseEndPoint(); - this.Cleanup(); - } - - protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) - { - return this.asyncCloseCaller.BeginInvoke(timeout, callback, state); - } - - protected override void OnEndClose(IAsyncResult result) - { - this.asyncCloseCaller.EndInvoke(result); - } - - private AmqpMessage WcfToQpid(Message wcfMessage) - { - object obj; - AmqpProperties applicationProperties = null; - bool success = false; - AmqpMessage amqpMessage = null; - - if (wcfMessage.Properties.TryGetValue("AmqpProperties", out obj)) - { - applicationProperties = obj as AmqpProperties; - } - - try - { - AmqpProperties outgoingProperties = new AmqpProperties(); - - // Start with AMQP properties from the binding and the URI - if (this.factoryChannelProperties.DefaultMessageProperties != null) - { - outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties); - } - - if (this.subject != null) - { - outgoingProperties.RoutingKey = this.subject; - } - - if (this.qpidSubject != null) - { - outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject); - } - - // Add the Properties set by the application on this particular message. - // Application properties trump channel properties - if (applicationProperties != null) - { - outgoingProperties.MergeFrom(applicationProperties); - } - - amqpMessage = this.outputLink.CreateMessage(); - amqpMessage.Properties = outgoingProperties; - - // copy the WCF message body to the AMQP message body - if (this.streamed) - { - this.encoder.WriteMessage(wcfMessage, amqpMessage.BodyStream); - } - else - { - ArraySegment encodedBody = this.encoder.WriteMessage(wcfMessage, int.MaxValue, this.bufferManager); - try - { - amqpMessage.BodyStream.Write(encodedBody.Array, encodedBody.Offset, encodedBody.Count); - } - finally - { - this.bufferManager.ReturnBuffer(encodedBody.Array); - } - } - - success = true; - } - finally - { - if (!success && (amqpMessage != null)) - { - amqpMessage.Dispose(); - } - } - return amqpMessage; - } - - - private Message QpidToWcf(AmqpMessage amqpMessage) - { - if (amqpMessage == null) - { - return null; - } - - Message wcfMessage = null; - byte[] managedBuffer = null; - - try - { - if (this.streamed) - { - wcfMessage = this.encoder.ReadMessage(amqpMessage.BodyStream, int.MaxValue); - } - else - { - int count = (int)amqpMessage.BodyStream.Length; - managedBuffer = this.bufferManager.TakeBuffer(count); - int nr = amqpMessage.BodyStream.Read(managedBuffer, 0, count); - ArraySegment bufseg = new ArraySegment(managedBuffer, 0, count); - - wcfMessage = this.encoder.ReadMessage(bufseg, this.bufferManager); - - // set to null for finally{} block, since the encoder is now responsible for - // returning the BufferManager memory - managedBuffer = null; - } - - // This message will be discarded unless the "To" header matches - // the WCF endpoint dispatcher's address filter (or the service is - // AddressFilterMode=AddressFilterMode.Any). - - this.remoteAddress.ApplyTo(wcfMessage); - - if (amqpMessage.Properties != null) - { - wcfMessage.Properties.Add("AmqpProperties", amqpMessage.Properties); - } - } - catch (XmlException xmlException) - { - throw new ProtocolException( - "There is a problem with the XML that was received from the network. See inner exception for more details.", - xmlException); - } - catch (Exception e) - { - // TODO: logging - Console.WriteLine("TX channel encoder exception " + e); - } - finally - { - // close the amqpMessage unless the body will be read at a later time. - if (!this.streamed || wcfMessage == null) - { - amqpMessage.Close(); - } - - // the handoff to the encoder failed - if (managedBuffer != null) - { - this.bufferManager.ReturnBuffer(managedBuffer); - } - } - - return wcfMessage; - } - - private void Cleanup() - { - this.bufferManager.Clear(); - } - - private string UriToQpidAddress(Uri uri, out string subject) - { - if (uri.Scheme != AmqpConstants.Scheme) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "The scheme {0} specified in address is not supported.", uri.Scheme), "uri"); - } - - subject = ""; - string path = uri.LocalPath; - string query = uri.Query; - - // legacy... convert old style myqueue?routingkey=key to myqueue/key - - if (query.Length > 0) - { - if (!query.StartsWith("?")) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Invalid query argument."), "uri"); - } - - string routingParseKey = "routingkey="; - string subjectParseKey = "subject="; - char[] charSeparators = new char[] { '?', ';' }; - string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); - foreach (string s in args) - { - if (s.StartsWith(routingParseKey)) - { - subject = s.Substring(routingParseKey.Length); - } - else if (s.StartsWith(subjectParseKey)) - { - subject = s.Substring(subjectParseKey.Length); - } - else - { - if (s.Length > 0) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Invalid query argument {0}.", s), "uri"); - } - } - } - - if (path.Contains("/")) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Invalid queue name {0}.", path), "uri"); - } - - if (path.Length == 0) - { - // special case, user wants default exchange - return "//" + subject; - } - - return path + "/" + subject; - } - - // find subject in "myqueue/mysubject;{mode:browse}" - int pos = path.IndexOf('/'); - if ((pos > -1) && (pos < path.Length + 1)) - { - subject = path.Substring(pos); - pos = subject.IndexOf(';'); - if (pos == 0) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Empty subject in address {0}.", path), "uri"); - } - - if (pos > 0) - { - subject = subject.Substring(0, pos); - } - } - - if (subject.Length > 0) - { - subject = HttpUtility.UrlDecode(subject); - } - - return HttpUtility.UrlDecode(path); - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs deleted file mode 100644 index b722983ead..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs +++ /dev/null @@ -1,101 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - /// - /// This class is used by the AMQP Transport to set transport-level security settings for a binding - /// - public sealed class AmqpTransportSecurity - { - private AmqpCredentialType credentialType; - - // WCF frowns on unencrypted credentials on the wire, but AMQP is agnostic. - // For interoperability, allow SSL to be turned on/off independentaly. - private bool useSSL; - - // Allow per channel credentials, but also ease the common case where - // credentials are shared and wish to be globally set in a config file. - private AmqpCredential defaultCredential; - - // if true, do not look at context for ServiceModel.Description.ClientCredentials. - // ClientCredentials will be place of choice for WCF traditionalists - // to specify auth tokens to the AMQP server when Windows and SASL tokens - // look the same. At other times it makes no sense and sometimes it is - // confusing with Message-level credentials. - private bool ignoreEndpointClientCredentials; - - - internal AmqpTransportSecurity() - { - this.credentialType = AmqpCredentialType.Anonymous; - this.useSSL = true; - } - - /// - /// gets or sets the SASL mechanism for AMQP authentication between client and server. - /// - public AmqpCredentialType CredentialType - { - get { return this.credentialType; } - - set { this.credentialType = value; } - } - - /// - /// gets or sets the flag that controls the use of SSL encryption - /// over the network connection. - /// - public bool UseSSL - { - get { return this.useSSL; } - set { this.useSSL = value; } - } - - /// - /// gets the default credential object for authentication with the AMQP server. - /// - public AmqpCredential DefaultCredential - { - get { return this.defaultCredential; } - set { this.defaultCredential = value; } - } - - /// - /// gets or sets the endpoint ClientCredentials search parameter. If true, - /// only AmqpCredential objects are searched for in the surrounding context. - /// - public bool IgnoreEndpointClientCredentials - { - get { return this.ignoreEndpointClientCredentials; } - set { this.ignoreEndpointClientCredentials = value; } - } - - internal AmqpTransportSecurity Clone() - { - AmqpTransportSecurity sec = (AmqpTransportSecurity)this.MemberwiseClone(); - if (this.defaultCredential != null) - { - sec.defaultCredential = this.defaultCredential.Clone(); - } - - return sec; - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj deleted file mode 100644 index 1eb811b425..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ /dev/null @@ -1,112 +0,0 @@ - - - - - Debug - AnyCPU - 9.0.21022 - 2.0 - {8AABAB30-7D1E-4539-B7D1-05450262BAD2} - Library - Properties - Apache.Qpid.Channel - Apache.Qpid.Channel - v3.5 - 512 - - - true - ..\..\..\wcfnet.snk - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 3.0 - - - 3.0 - - - - - - - - {C9B6AC75-6332-47A4-B82B-0C20E0AF2D34} - Interop - - - - - - \ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs deleted file mode 100644 index 7238ff2120..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs +++ /dev/null @@ -1,329 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Collections; - using System.Collections.Generic; - using System.Text; - using System.Threading; - - using Apache.Qpid.Interop; - - // The ConnectionManager looks after a shareable pool of AmqpConnection and AmqpSession - // objects. If two connection requests could be shared (see MakeKey() properties), and - // are designated as shareable, then they will be paired up. Each shared connection is - // a separate instance of a ManagedConnection. All unshared connections use a single - // instance of ManagedConnection with locking turned off. The ManagedConnection object - // registers for notifictation when a connection goes idle (all grandchild InputLink and - // OutputLink objects have been closed), and closes the connection. - - // TODO: the session sharing is roughed-in via comments but needs completing. - - internal sealed class ConnectionManager - { - // A side effect of creating InputLinks and OutputLinks is that counters - // in the respective AmqpSession and AmqpConnection are updated, so care - // must be taken to hold the lock across acquiring a session and opening - // a link on it. - - // one for each shared connection - private static Dictionary sharedInstances; - - // this one creates and releases connections that are not shared. No locking required. - private static ManagedConnection unsharedInstance; - - // lock for finding or creating ManagedConnection instances - private static Object connectionLock; - - static ConnectionManager() - { - unsharedInstance = null; - sharedInstances = new Dictionary(); - connectionLock = new Object(); - } - - private static string MakeKey(AmqpChannelProperties props) - { - StringBuilder sb = new StringBuilder(); - sb.Append(props.BrokerHost); - sb.Append(':'); - sb.Append(props.BrokerPort); - sb.Append(':'); - sb.Append(props.TransferMode); - - AmqpTransportSecurity sec = props.AmqpTransportSecurity; - if (sec == null) - { - return sb.ToString(); - } - - if (sec.UseSSL) - { - sb.Append(":SSL"); - } - - if (sec.CredentialType == AmqpCredentialType.Plain) - { - sb.Append(":saslP"); - AmqpCredential cred = props.AmqpCredential; - if (cred != null) - { - sb.Append(":NM:"); - sb.Append(cred.UserName); - sb.Append(":PW:"); - sb.Append(cred.Password); - } - } - - return sb.ToString(); - } - - private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing) - { - if (connectionSharing) - { - string key = MakeKey(channelProperties); - lock (connectionLock) - { - ManagedConnection mc = null; - if (!sharedInstances.TryGetValue(key, out mc)) - { - mc = new ManagedConnection(true); - sharedInstances.Add(key, mc); - } - return mc; - } - } - else - { - lock (connectionLock) - { - if (unsharedInstance == null) - { - unsharedInstance = new ManagedConnection(false); - } - return unsharedInstance; - } - } - } - - public static OutputLink GetOutputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname) - { - ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing); - return (OutputLink)mc.GetLink(channelProperties, sessionSharing, null, qname); - } - - public static InputLink GetInputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname) - { - ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing); - return (InputLink)mc.GetLink(channelProperties, sessionSharing, qname, null); - } - - - - class ManagedConnection - { - private Boolean shared; - private AmqpConnection sharedConnection; - //private Dictionary sharedSessions; - - public ManagedConnection(bool shared) - { - this.shared = shared; - } - - - public object GetLink(AmqpChannelProperties channelProperties, bool sessionSharing, string inputQueue, string outputQueue) - { - AmqpConnection connection = null; - AmqpSession session = null; - Object link = null; - bool newConnection = false; - //bool newSession = false; - bool success = false; - - // when called in the non-shared case, only stack variables should be used for holding connections/sessions/links - - if (this.shared) - { - Monitor.Enter(this); // lock - } - - try - { - if (this.shared) - { - // TODO: check shared connection not closed (i.e. network drop) and refresh this instance if needed - if (sessionSharing) - { - throw new NotImplementedException("shared session"); - /* * ... once we have a defined shared session config parameter: - - // lazilly create - if (this.sharedSessions == null) - { - this.sharedSessions = new Dictionary(); - } - - alreadydeclaredstring sessionKey = channelProperties.name_of_key_goes_here; - this.sharedSessions.TryGetValue(sessionKey, out session); - - * */ - } - - if (this.sharedConnection != null) - { - connection = this.sharedConnection; - } - } - - if (connection == null) - { - if (channelProperties.AmqpSecurityMode != AmqpSecurityMode.None) - { - string user = null; - string passwd = null; - bool ssl = false; - bool saslPlain = false; - - AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; - if (tsec.UseSSL) - { - ssl = true; - } - - if (tsec.CredentialType == AmqpCredentialType.Plain) - { - saslPlain = true; - AmqpCredential plainCred = channelProperties.AmqpCredential; - if (plainCred != null) - { - user = plainCred.UserName; - passwd = plainCred.Password; - } - } - - connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort, - ssl, saslPlain, user, passwd); - } - else - { - connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); - } - - newConnection = true; - if (this.shared) - { - connection.OnConnectionIdle += new ConnectionIdleEventHandler(this.IdleConnectionHandler); - } - else - { - connection.OnConnectionIdle += new ConnectionIdleEventHandler(UnsharedIdleConnectionHandler); - } - } - - if (session == null) - { - session = connection.CreateSession(); - //newSession = true; - } - - if (inputQueue != null) - { - link = session.CreateInputLink(inputQueue); - } - else - { - link = session.CreateOutputLink(outputQueue); - } - - if (this.shared) - { - if (newConnection) - { - this.sharedConnection = connection; - } - /* - if (newSession) - { - sharedSessions.Add(foo, session); - } - * */ - } - - success = true; - } - finally - { - if (this.shared) - { - Monitor.Exit(this); - } - if (!success) - { - /* - if (newSession) - { - session.Close(); - } - */ - if (newConnection) - { - connection.Close(); - } - } - } - - return link; - } - - - static void UnsharedIdleConnectionHandler(Object sender, EventArgs empty) - { - if (sender is AmqpConnection) - { - AmqpConnection connection = (AmqpConnection)sender; - connection.Close(); - } - } - - void IdleConnectionHandler(Object sender, EventArgs empty) - { - lock (this) - { - if (sharedConnection != sender || sharedConnection == null) - { - return; - } - if (!sharedConnection.IsIdle) - { - // Another thread made the connection busy again. - // That's OK. Another idle event will come along later. - return; - } - sharedConnection.Close(); // also closes all child sessions - sharedConnection = null; - //sharedSessions = null; - } - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs b/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs deleted file mode 100644 index edd9a056a7..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,52 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -[assembly: AssemblyTitle("Apache.Qpid.Channel")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("")] -[assembly: AssemblyCopyright("")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to true makes the types in this assembly visible -// to COM components. This is required for this to be used by an -// Excel RTD component. -[assembly: ComVisible(true)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM -[assembly: Guid("ac02bbb0-2c19-43fb-a36c-b1b0a50eaf1a")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs deleted file mode 100644 index 5925fa47dc..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs +++ /dev/null @@ -1,374 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.IO; - using System.ServiceModel.Channels; - using System.Xml; - - // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes). - // If bodyBytes belongs to a BufferManager, we must return it when done. - // The pay-off is OnGetReaderAtBodyContents(). - // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery. - internal class RawMessage : Message - { - private MessageHeaders headers; - private MessageProperties properties; - private XmlDictionaryReaderQuotas readerQuotas; - private Stream bodyStream; - private byte[] bodyBytes; - private int index; - private int count; - private BufferManager bufferManager; - - public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas) - { - // this constructor supports MessageEncoder.ReadMessage(ArraySegment b, BufferManager mgr, string contentType) - if (quotas == null) - { - quotas = new XmlDictionaryReaderQuotas(); - } - - this.headers = new MessageHeaders(MessageVersion.None); - this.properties = new MessageProperties(); - this.readerQuotas = quotas; - this.bodyBytes = buffer; - this.index = index; - this.count = count; - this.bufferManager = bufferManager; - } - - public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas) - { - // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType) - if (quotas == null) - { - quotas = new XmlDictionaryReaderQuotas(); - } - - this.headers = new MessageHeaders(MessageVersion.None); - this.properties = new MessageProperties(); - this.bodyStream = stream; - } - - public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) - { - // this constructor supports internal needs for CreateBufferedCopy().CreateMessage() - this.headers = new MessageHeaders(headers); - this.properties = new MessageProperties(properties); - this.bodyBytes = bytes; - this.index = index; - this.count = count; - this.readerQuotas = quotas; - } - - public override MessageHeaders Headers - { - get - { - if (this.IsDisposed) - { - throw new ObjectDisposedException("message"); - } - - return this.headers; - } - } - - public override bool IsEmpty - { - get - { - if (this.IsDisposed) - { - throw new ObjectDisposedException("message"); - } - - return false; - } - } - - public override bool IsFault - { - get - { - if (this.IsDisposed) - { - throw new ObjectDisposedException("message"); - } - - return false; - } - } - - public override MessageProperties Properties - { - get - { - if (this.IsDisposed) - { - throw new ObjectDisposedException("message"); - } - - return this.properties; - } - } - - public override MessageVersion Version - { - get - { - if (this.IsDisposed) - { - throw new ObjectDisposedException("message"); - } - - return MessageVersion.None; - } - } - - protected override void OnBodyToString(XmlDictionaryWriter writer) - { - if (this.bodyStream != null) - { - writer.WriteString("Stream"); - } - else - { - writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); - writer.WriteBase64(this.bodyBytes, this.index, this.count); - writer.WriteEndElement(); - } - } - - protected override void OnClose() - { - Exception deferEx = null; - try - { - base.OnClose(); - } - catch (Exception e) - { - deferEx = e; - } - - try - { - if (this.properties != null) - { - this.properties.Dispose(); - } - } - catch (Exception e) - { - if (deferEx == null) - { - deferEx = e; - } - } - - try - { - if (this.bufferManager != null) - { - this.bufferManager.ReturnBuffer(this.bodyBytes); - this.bufferManager = null; - } - } - catch (Exception e) - { - if (deferEx == null) - { - deferEx = e; - } - } - - if (deferEx != null) - { - throw deferEx; - } - } - - protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize) - { - if (this.bodyStream != null) - { - int len = (int)this.bodyStream.Length; - byte[] buf = new byte[len]; - this.bodyStream.Read(buf, 0, len); - this.bodyStream = null; - this.bodyBytes = buf; - this.count = len; - this.index = 0; - } - else - { - if (this.bufferManager != null) - { - // we could take steps to share the buffer among copies and release the memory - // after the last user finishes by a reference count or such, but we are already - // far from the intended optimized use. Make one GC managed memory copy that is - // shared by all. - byte[] buf = new byte[this.count]; - - Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count); - this.bufferManager.ReturnBuffer(this.bodyBytes); - this.bufferManager = null; - this.bodyBytes = buf; - this.index = 0; - } - } - - return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); - } - - protected override XmlDictionaryReader OnGetReaderAtBodyContents() - { - Stream readerStream = null; - bool ownsStream; - - if (this.bodyStream != null) - { - readerStream = this.bodyStream; - ownsStream = false; - } - else - { - // create stream for duration of XmlReader. - ownsStream = true; - if (this.bufferManager != null) - { - readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager); - this.bufferManager = null; - } - else - { - readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false); - } - } - - return new RawXmlReader(readerStream, this.readerQuotas, ownsStream); - } - - protected override void OnWriteBodyContents(XmlDictionaryWriter writer) - { - writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); - if (this.bodyStream != null) - { - int len = (int)this.bodyStream.Length; - byte[] buf = new byte[len]; - this.bodyStream.Read(buf, 0, len); - writer.WriteBase64(buf, 0, len); - } - else - { - writer.WriteBase64(this.bodyBytes, this.index, this.count); - } - - writer.WriteEndElement(); - } - - private class RawMemoryStream : MemoryStream - { - private BufferManager bufferManager; - private byte[] buffer; - - public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr) - : base(bytes, index, count, false) - { - this.bufferManager = mgr; - this.buffer = bytes; - } - - protected override void Dispose(bool disposing) - { - if (this.bufferManager != null) - { - try - { - this.bufferManager.ReturnBuffer(this.buffer); - } - finally - { - this.bufferManager = null; - base.Dispose(disposing); - } - } - } - } - - private class RawMessageBuffer : MessageBuffer - { - private bool closed; - private MessageHeaders headers; - private MessageProperties properties; - private byte[] bodyBytes; - private int index; - private int count; - private XmlDictionaryReaderQuotas readerQuotas; - - public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) - : base() - { - this.headers = new MessageHeaders(headers); - this.properties = new MessageProperties(properties); - this.bodyBytes = bytes; - this.index = index; - this.count = count; - this.readerQuotas = new XmlDictionaryReaderQuotas(); - quotas.CopyTo(this.readerQuotas); - } - - public override int BufferSize - { - get { return this.count; } - } - - public override void Close() - { - if (!this.closed) - { - this.closed = true; - this.headers = null; - if (this.properties != null) - { - this.properties.Dispose(); - this.properties = null; - } - - this.bodyBytes = null; - this.readerQuotas = null; - } - } - - public override Message CreateMessage() - { - if (this.closed) - { - throw new ObjectDisposedException("message"); - } - - return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs deleted file mode 100644 index 76dae6f6c7..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs +++ /dev/null @@ -1,113 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.IO; - using System.ServiceModel.Channels; - using System.ServiceModel; - using System.Xml; - - - class RawMessageEncoder : MessageEncoder - { - public const string StreamElementName = "Binary"; - - XmlDictionaryReaderQuotas readerQuotas; - - public RawMessageEncoder(XmlDictionaryReaderQuotas quotas) - { - this.readerQuotas = new XmlDictionaryReaderQuotas(); - if (quotas != null) - { - quotas.CopyTo(this.readerQuotas); - } - } - - public override string ContentType - { - get { return null; } - } - - public override bool IsContentTypeSupported(string contentType) - { - return true; - } - - public override string MediaType - { - get { return null; } - } - - public override MessageVersion MessageVersion - { - get { return MessageVersion.None; } - } - - public override Message ReadMessage(ArraySegment buffer, BufferManager bufferManager, string contentType) - { - RawMessage message = new RawMessage(buffer.Array, buffer.Offset, buffer.Count, bufferManager, readerQuotas); - message.Properties.Encoder = this; - return message; - } - - public override Message ReadMessage(Stream stream, int maxSizeOfHeaders, string contentType) - { - RawMessage message = new RawMessage(stream, readerQuotas); - message.Properties.Encoder = this; - return message; - } - - private void CheckType(XmlDictionaryReader reader, XmlNodeType type) - { - if (reader.NodeType != type) - { - throw new System.IO.InvalidDataException(String.Format("RawMessageEncoder xml check {0} type should be {1}", type, reader.NodeType)); - } - } - - public override ArraySegment WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset) - { - MemoryStream tempStream = new MemoryStream(); - this.WriteMessage(message, tempStream); - int len = messageOffset + (int)tempStream.Length; - byte[] buf = bufferManager.TakeBuffer(len); - MemoryStream targetStream = new MemoryStream(buf); - if (messageOffset > 0) - { - targetStream.Seek(messageOffset, SeekOrigin.Begin); - } - - tempStream.WriteTo(targetStream); - targetStream.Close(); - - return new ArraySegment(buf, messageOffset, len - messageOffset); - } - - public override void WriteMessage(Message message, Stream stream) - { - using (XmlWriter writer = new RawXmlWriter(stream)) - { - message.WriteMessage(writer); - writer.Flush(); - } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs deleted file mode 100644 index 5c015f9a1b..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs +++ /dev/null @@ -1,45 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.Xml; - using System.ServiceModel.Channels; - - internal class RawMessageEncoderFactory : MessageEncoderFactory - { - RawMessageEncoder encoder; - - public RawMessageEncoderFactory(XmlDictionaryReaderQuotas quotas) - { - this.encoder = new RawMessageEncoder(quotas); - } - - public override MessageEncoder Encoder - { - get { return this.encoder; } - } - - public override MessageVersion MessageVersion - { - get { return encoder.MessageVersion; } - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs deleted file mode 100644 index 5ec10a976d..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs +++ /dev/null @@ -1,102 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.ServiceModel.Channels; - - public class RawMessageEncodingBindingElement : MessageEncodingBindingElement - { - - public RawMessageEncodingBindingElement() - : base() - { - } - - RawMessageEncodingBindingElement(RawMessageEncodingBindingElement originalBindingElement) - { - } - - public override MessageEncoderFactory CreateMessageEncoderFactory() - { - return new RawMessageEncoderFactory(null); - } - - - public override IChannelFactory BuildChannelFactory(BindingContext context) - { - if (context == null) - throw new ArgumentNullException("context"); - - context.BindingParameters.Add(this); - return context.BuildInnerChannelFactory(); - } - - public override bool CanBuildChannelFactory(BindingContext context) - { - if (context == null) - throw new ArgumentNullException("context"); - - return context.CanBuildInnerChannelFactory(); - } - - public override IChannelListener BuildChannelListener(BindingContext context) - { - if (context == null) - throw new ArgumentNullException("context"); - - context.BindingParameters.Add(this); - return context.BuildInnerChannelListener(); - } - - public override bool CanBuildChannelListener(BindingContext context) - { - if (context == null) - throw new ArgumentNullException("context"); - - context.BindingParameters.Add(this); - return context.CanBuildInnerChannelListener(); - } - - - public override BindingElement Clone() - { - return new RawMessageEncodingBindingElement(this); - } - - - - public override MessageVersion MessageVersion - { - get - { - return MessageVersion.None; - } - - set - { - if (value != MessageVersion.None) - throw new ArgumentException("Unsupported message version"); - } - } - - - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs deleted file mode 100644 index 8fadfce441..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs +++ /dev/null @@ -1,353 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.IO; - using System.Xml; - - internal class RawXmlReader : XmlDictionaryReader - { - ////this class presents a hardcoded XML InfoSet: "X" where X is the entire stream content - - private Stream stream; - private bool closed; - private bool streamOwner; - private ReaderPosition position; - private string contentAsBase64; - private XmlNameTable xmlNameTable; - private XmlDictionaryReaderQuotas readerQuotas; - - public RawXmlReader(Stream stream, XmlDictionaryReaderQuotas quotas, bool streamOwner) - { - this.stream = stream; - this.streamOwner = streamOwner; - if (quotas == null) - { - this.readerQuotas = new XmlDictionaryReaderQuotas(); - } - else - { - this.readerQuotas = quotas; - } - } - - private enum ReaderPosition - { - None, - StartElement, - Content, - EndElement, - EOF - } - - public override int AttributeCount - { - get { return 0; } - } - - public override string BaseURI - { - get { return string.Empty; } - } - - public override int Depth - { - get { return (this.position == ReaderPosition.Content) ? 1 : 0; } - } - - public override bool EOF - { - get { return this.position == ReaderPosition.EOF; } - } - - public override bool HasAttributes - { - get { return false; } - } - - public override bool HasValue - { - get { return this.position == ReaderPosition.Content; } - } - - public override bool IsEmptyElement - { - get { return false; } - } - - public override string LocalName - { - get - { - if (this.position == ReaderPosition.StartElement) - { - return RawMessageEncoder.StreamElementName; - } - - return null; - } - } - - public override string NamespaceURI - { - get { return string.Empty; } - } - - public override XmlNameTable NameTable - { - get - { - if (this.xmlNameTable == null) - { - this.xmlNameTable = new NameTable(); - this.xmlNameTable.Add(RawMessageEncoder.StreamElementName); - } - - return this.xmlNameTable; - } - } - - public override XmlNodeType NodeType - { - get - { - switch (this.position) - { - case ReaderPosition.StartElement: - return XmlNodeType.Element; - case ReaderPosition.Content: - return XmlNodeType.Text; - case ReaderPosition.EndElement: - return XmlNodeType.EndElement; - default: - // and StreamPosition.EOF - return XmlNodeType.None; - } - } - } - - public override string Prefix - { - get { return string.Empty; } - } - - public override ReadState ReadState - { - get - { - switch (this.position) - { - case ReaderPosition.None: - return ReadState.Initial; - case ReaderPosition.StartElement: - case ReaderPosition.Content: - case ReaderPosition.EndElement: - return ReadState.Interactive; - case ReaderPosition.EOF: - return ReadState.Closed; - default: - return ReadState.Error; - } - } - } - - public override string Value - { - get - { - switch (this.position) - { - case ReaderPosition.Content: - if (this.contentAsBase64 == null) - { - this.contentAsBase64 = Convert.ToBase64String(this.ReadContentAsBase64()); - } - - return this.contentAsBase64; - - default: - return string.Empty; - } - } - } - - public override void Close() - { - if (!this.closed) - { - this.closed = true; - this.position = ReaderPosition.EOF; - this.readerQuotas = null; - if (this.streamOwner) - { - this.stream.Close(); - } - } - } - - public override string GetAttribute(int i) - { - throw new ArgumentOutOfRangeException("i", i, "Argument not in set of valid values"); - } - - public override string GetAttribute(string name, string namespaceURI) - { - return null; - } - - public override string GetAttribute(string name) - { - return null; - } - - public override string LookupNamespace(string prefix) - { - if (prefix == string.Empty) - { - return string.Empty; - } - else if (prefix == "xml") - { - return "http://www.w3.org/XML/1998/namespace"; - } - else if (prefix == "xmlns") - { - return "http://www.w3.org/2000/xmlns/"; - } - else - { - return null; - } - } - - public override bool MoveToAttribute(string name, string ns) - { - return false; - } - - public override bool MoveToAttribute(string name) - { - return false; - } - - public override bool MoveToElement() - { - if (this.position == ReaderPosition.None) - { - this.position = ReaderPosition.StartElement; - return true; - } - - return false; - } - - public override bool MoveToFirstAttribute() - { - return false; - } - - public override bool MoveToNextAttribute() - { - return false; - } - - public override bool Read() - { - switch (this.position) - { - case ReaderPosition.None: - this.position = ReaderPosition.StartElement; - return true; - case ReaderPosition.StartElement: - this.position = ReaderPosition.Content; - return true; - case ReaderPosition.Content: - this.position = ReaderPosition.EndElement; - return true; - case ReaderPosition.EndElement: - this.position = ReaderPosition.EOF; - return false; - case ReaderPosition.EOF: - return false; - default: - return false; - } - } - - public override bool ReadAttributeValue() - { - return false; - } - - public override int ReadContentAsBase64(byte[] buffer, int index, int count) - { - if (buffer == null) - { - throw new ArgumentNullException("buffer"); - } - - if (this.position != ReaderPosition.Content) - { - throw new InvalidOperationException("XML reader not in Element"); - } - - if (count == 0) - { - return 0; - } - - int readCount = this.stream.Read(buffer, index, count); - if (readCount == 0) - { - this.position = ReaderPosition.EndElement; - } - - return readCount; - } - - public override int ReadContentAsBinHex(byte[] buffer, int index, int count) - { - throw new NotSupportedException(); - } - - public override void ResolveEntity() - { - throw new NotSupportedException(); - } - - public override bool TryGetBase64ContentLength(out int length) - { - // The whole stream is this one element - if (!this.closed && this.stream.CanSeek) - { - long streamLength = this.stream.Length; - if (streamLength <= int.MaxValue) - { - length = (int)streamLength; - return true; - } - } - - length = -1; - return false; - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs deleted file mode 100644 index 7d05b70807..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs +++ /dev/null @@ -1,221 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -namespace Apache.Qpid.Channel -{ - using System; - using System.IO; - using System.Xml; - - internal sealed class RawXmlWriter : XmlDictionaryWriter - { - - WriteState state; - Stream stream; - bool closed; - bool rawWritingEnabled; - - public RawXmlWriter(Stream stream) - { - if (stream == null) - { - throw new ArgumentNullException("Stream"); - } - - this.stream = stream; - this.state = WriteState.Start; - } - - public override WriteState WriteState - { - get - { - return this.state; - } - } - - public override void Close() - { - if (!this.closed) - { - this.closed = true; - this.state = WriteState.Closed; - this.rawWritingEnabled = false; - } - } - - public override void Flush() - { - this.ThrowIfClosed(); - this.stream.Flush(); - } - - public override string LookupPrefix(string ns) - { - return null; - } - - public override void WriteBase64(byte[] buffer, int index, int count) - { - if (buffer == null) - { - throw new ArgumentNullException("buffer"); - } - - ThrowIfClosed(); - - if (!this.rawWritingEnabled) - { - throw new InvalidOperationException("XmlWriter not in Element"); - } - - this.stream.Write(buffer, index, count); - this.state = WriteState.Content; - } - - public override void WriteStartElement(string prefix, string localName, string ns) - { - ThrowIfClosed(); - if (this.state != WriteState.Start) - { - throw new InvalidOperationException("Start Element Already Called"); - } - - if (!string.IsNullOrEmpty(prefix) || !string.IsNullOrEmpty(ns) || localName != RawMessageEncoder.StreamElementName) - { - throw new XmlException("Wrong XML Start Element Name"); - } - this.state = WriteState.Element; - this.rawWritingEnabled = true; - } - - public override void WriteEndElement() - { - ThrowIfClosed(); - if (!this.rawWritingEnabled) - { - throw new InvalidOperationException("Unexpected End Element"); - } - this.rawWritingEnabled = false; - } - - public override void WriteFullEndElement() - { - this.WriteEndElement(); - } - - public override void WriteEndDocument() - { - this.rawWritingEnabled = false; - this.ThrowIfClosed(); - } - - public override void WriteStartDocument() - { - this.rawWritingEnabled = false; - this.ThrowIfClosed(); - } - - public override void WriteStartDocument(bool standalone) - { - this.rawWritingEnabled = false; - this.ThrowIfClosed(); - } - - private void ThrowIfClosed() - { - if (this.closed) - { - throw new InvalidOperationException("XML Writer closed"); - } - } - - - public override void WriteString(string text) - { - throw new NotSupportedException(); - } - - public override void WriteCData(string text) - { - throw new NotSupportedException(); - } - - public override void WriteCharEntity(char ch) - { - throw new NotSupportedException(); - } - - public override void WriteChars(char[] buffer, int index, int count) - { - throw new NotSupportedException(); - } - - public override void WriteComment(string text) - { - throw new NotSupportedException(); - } - - public override void WriteDocType(string name, string pubid, string sysid, string subset) - { - throw new NotSupportedException(); - } - - public override void WriteEndAttribute() - { - throw new NotSupportedException(); - } - - public override void WriteEntityRef(string name) - { - throw new NotSupportedException(); - } - - - public override void WriteProcessingInstruction(string name, string text) - { - throw new NotSupportedException(); - } - - public override void WriteRaw(string data) - { - throw new NotSupportedException(); - } - - public override void WriteRaw(char[] buffer, int index, int count) - { - throw new NotSupportedException(); - } - - public override void WriteStartAttribute(string prefix, string localName, string ns) - { - throw new NotSupportedException(); - } - - public override void WriteSurrogateCharEntity(char lowChar, char highChar) - { - throw new NotSupportedException(); - } - - public override void WriteWhitespace(string ws) - { - throw new NotSupportedException(); - } - } -} diff --git a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp deleted file mode 100644 index ec98289923..0000000000 --- a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp +++ /dev/null @@ -1,797 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - - -// -// This module provides the backend recovery driver for Windows resource managers based on -// the IDtcToXaHelperSinglePipe interface. The dll is loaded (LoadLibrary) directly into DTC -// itself and runs at a different protection level from the resource manager instance, which -// runs inside the application. -// -// The DTC dynamically loads this file, calls GetXaSwitch() to access the XA interface -// implementation and unloads the dll when done. -// -// This DTC plugin is only called for registration and recovery. Each time the application -// registers the Qpid resource manager with DTC, the plugin is loaded and a successful -// connection via xa_open is confirmed before completing registration and saving the DSN -// connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to -// re-establish a new connection with the broker and perform recovery. -// -// Because this plugin is not involved in coordinating any active transactions it only needs to -// partially implement the XA interface. -// -// For the same reason, the locking strategy is simple. A single global lock is used. -// Whenever networking activity is about to take place, the lock is relinquished and retaken -// soon thereafter. - - -#include -#include -#include -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/Connection.h" -#include "qpid/framing/FieldValue.h" - - -#include -#include -#include - -namespace Apache { -namespace Qpid { -namespace DtcPlugin { - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::framing::dtx; - -class ResourceManager -{ -private: - Connection qpidConnection; - Session qpidSession; - bool active; - std::string host; - int port; - std::string username; - std::string password; - bool ssl; - bool saslPlain; - - int rmid; - std::vector inDoubtXids; - // current scan position, or -1 if no scan - int cursor; -public: - ResourceManager(int id, std::string h, int p, bool sslP, bool saslPlainP, std::string uname, std::string pass) - : rmid(id), host(h), port(p), ssl(sslP), saslPlain(saslPlainP), username(uname), password(pass), - active(false), cursor(-1) {} - ~ResourceManager() {} - INT open(); - INT close(); - INT commit(XID *xid); - INT rollback(XID *xid); - INT recover(XID *xids, long count, long flags); -}; - - -CRITICAL_SECTION rmLock; - -std::map rmMap; -HMODULE thisDll = NULL; -bool memLocked = false; - -#define QPIDHMCHARS 512 - - -void pinDll() { - if (!memLocked) { - char thisDllName[QPIDHMCHARS]; - HMODULE ignore; - - DWORD nc = GetModuleFileName(thisDll, thisDllName, QPIDHMCHARS); - if ((nc > 0) && (nc < QPIDHMCHARS)) { - memLocked = GetModuleHandleEx(GET_MODULE_HANDLE_EX_FLAG_PIN, thisDllName, &ignore); - } - } -} - - -void XaToQpid(XID &winXid, Xid &qpidXid) { - // convert from XA defined structure XID to the Qpid framing structure - qpidXid.setFormat((uint32_t) winXid.formatID); - int bqualPos = 0; - if (winXid.gtrid_length > 0) { - qpidXid.setGlobalId(std::string(winXid.data, winXid.gtrid_length)); - bqualPos = winXid.gtrid_length; - } - if (winXid.bqual_length > 0) { - qpidXid.setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length)); - } -} - - -// this function assumes that the qpidXid has already been validated for the memory copy - -void QpidToXa(Xid &qpidXid, XID &winXid) { - // convert from the Qpid framing structure to the XA defined structure XID - winXid.formatID = qpidXid.getFormat(); - - const std::string& global_s = qpidXid.getGlobalId(); - size_t gl = global_s.size(); - winXid.gtrid_length = (long) gl; - if (gl > 0) - global_s.copy(winXid.data, gl); - - const std::string branch_s = qpidXid.getBranchId(); - size_t bl = branch_s.size(); - winXid.bqual_length = (long) bl; - if (bl > 0) - branch_s.copy(winXid.data + gl, bl); -} - - -static char *dsnHeader = "QPIDdsnV2"; - -const char* nextDot(const char *p) { - while (*p && (*p != '.')) - p++; - return p; -} - -int getHexChar (char c) { - if ((c >= '0') && (c <= '9')) - return c - '0'; - - if ((c >= 'a') && (c <= 'f')) - return 10 + (c - 'a'); - - if ((c >= 'A') && (c <= 'F')) - return 10 + (c - 'A'); - - return -1; -} - -bool parseFromHex(const char* start, const char* end, std::string& target) -{ - const char *p = start; - - while ((p + 1) < end) { - int nibble = getHexChar(*p++); - if (nibble < 0) - return false; - int byte = (nibble << 4); - nibble = getHexChar(*p++); - if (nibble < 0) - return false; - byte += nibble; - target.append (1, (char) byte & 0xFF); - } - return (p == end); -} - - -// parse string from AmqpConnection::DataSourcename -// "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password" -// -// parse strictly and return false if the dsn is in a bad format - -bool parseDsn (const char *dsn, std::string& host, int& port, bool& ssl, bool& saslPlain, - std::string& username, std::string& password) { - if (dsn == NULL) - return false; - - size_t len = strnlen(dsn, 1025); - if (len > 1024) - return false; - - if (strncmp(dsn, dsnHeader, strlen(dsnHeader))) - return false; - - const char *endp = dsn + len; - const char *tokenp = dsn + strlen(dsnHeader); - if (*tokenp != '.') - return false; - - // port - tokenp++; - if (tokenp >= endp) - return false; - if (*tokenp == '.') - return false; // null port not allowed - - const char *token_end = nextDot(tokenp); - if ((token_end - tokenp) > 5) - return false; - - port = 0; - for (const char *p = tokenp; p < token_end; p++) { - if ((*p < '0') || (*p > '9')) - return false; - port = (10 * port) + (*p - '0'); - } - - if (port > 65535) - return false; - - // host - tokenp = token_end + 1; - if (tokenp >= endp) - return false; - if (*tokenp == '.') - return false; // null host not allowed - - token_end = nextDot(tokenp); - if (!parseFromHex(tokenp, token_end, host)) - return false; - - // skip the RM identifier, but verify it exists - tokenp = token_end + 1; - if (tokenp >= endp) - return false; - token_end = nextDot (tokenp); - if ((token_end - tokenp) < 3) - return false; - - // ssl: look for T or F - tokenp = token_end + 1; - if (tokenp >= endp) - return false; - if (*tokenp == 'T') - ssl = true; - else if (*tokenp == 'F') - ssl = false; - else - return false; - if (*++tokenp != '.') - return false; - - // sasl mechanism: A = anonymous, P = plain. More to come... - ++tokenp; - if (tokenp >= endp) - return false; - if (*(tokenp+1) != '.') - return false; - - if (*tokenp == 'A') { - saslPlain = false; - tokenp += 2; - // no auth tokens - } - else if (*tokenp == 'P') { - saslPlain = true; - tokenp += 2; - if (tokenp >= endp) - return false; - token_end = nextDot (tokenp); - if (!parseFromHex(tokenp, token_end, username)) - return false; - tokenp = token_end + 1; - - if (tokenp >= endp) - return false; - token_end = nextDot (tokenp); - if (!parseFromHex(tokenp, token_end, password)) - return false; - tokenp = token_end + 1; - } - else - return false; - - return (tokenp == endp); -} - - - -INT ResourceManager::open() { - INT rv = XAER_RMERR; // placeholder until we successfully connect to resource - active = true; - LeaveCriticalSection(&rmLock); - - try { - ConnectionSettings settings; - settings.host = this->host; - settings.port = this->port; - - - if (ssl) - settings.protocol = "ssl"; - - if (saslPlain) { - settings.username = this->username; - settings.password = this->password; - settings.mechanism = "PLAIN"; - } - - qpidConnection.open(settings); - qpidSession = qpidConnection.newSession(); - rv = XA_OK; -/* -TODO: logging - } catch (const qpid::Exception& error) { - // log it - } catch (const std::exception& e2) { - // log it -*/ - } catch (...) { - // TODO: log it - } - - EnterCriticalSection(&rmLock); - active = false; - return rv; -} - - -INT ResourceManager::close() { - // should never be called when already sending other commands to broker - if (active) - return XAER_PROTO; - - INT rv = XAER_RMERR; // placeholder until we successfully close resource - active = true; - LeaveCriticalSection(&rmLock); - try { - if (qpidSession.isValid()) { - qpidSession.close(); - } - if (qpidConnection.isOpen()) { - qpidConnection.close(); - } - } catch (...) { - // TODO: log it - } - - EnterCriticalSection(&rmLock); - active = false; - - if (!qpidConnection.isOpen()) { - rv = XA_OK; - } - return rv; -} - - -INT ResourceManager::commit(XID *xid) { - if (active) - return XAER_PROTO; - - INT rv = XAER_RMFAIL; - active = true; - LeaveCriticalSection(&rmLock); - - try { - qpid::framing::Xid qpidXid; - XaToQpid(*xid, qpidXid); - - XaResult xaResult = qpidSession.dtxCommit(qpidXid, false, true); - if (xaResult.hasStatus()) { - uint16_t status = xaResult.getStatus(); - switch ((XaStatus) status) { - case XA_STATUS_XA_OK: - case XA_STATUS_XA_RDONLY: - case XA_STATUS_XA_HEURCOM: - rv = XA_OK; - break; - - default: - // commit failed and a retry won't fix - rv = XAER_RMERR; - break; - } - - } - } catch (...) { - // TODO: log it - } - - EnterCriticalSection(&rmLock); - active = false; - return rv; -} - - -INT ResourceManager::rollback(XID *xid) { - if (active) - return XAER_PROTO; - - INT rv = XAER_RMFAIL; - active = true; - LeaveCriticalSection(&rmLock); - - try { - qpid::framing::Xid qpidXid; - XaToQpid(*xid, qpidXid); - - XaResult xaResult = qpidSession.dtxRollback(qpidXid, true); - if (xaResult.hasStatus()) { - uint16_t status = xaResult.getStatus(); - switch ((XaStatus) status) { - case XA_STATUS_XA_OK: - case XA_STATUS_XA_HEURRB: - rv = XA_OK; - break; - - default: - // RM internal error - rv = XA_RBPROTO; - break; - } - } - } catch (...) { - // TODO: log it - } - - EnterCriticalSection(&rmLock); - active = false; - return rv; -} - - -INT ResourceManager::recover(XID *xids, long count, long flags) { - if (active) - return XAER_PROTO; - - if ((xids == NULL) && (count != 0)) - return XAER_INVAL; - - if (count < 0) - return XAER_INVAL; - - if (!(flags & TMSTARTRSCAN) && (cursor == -1)) - // no existing scan and no scan requested - return XAER_INVAL; - - INT status = XA_OK; - - if (flags & TMSTARTRSCAN) { - // start a fresh scan - cursor = -1; - inDoubtXids.clear(); - active = true; - LeaveCriticalSection(&rmLock); - - try { - // status if we can't talk to the broker - status = XAER_RMFAIL; - - DtxRecoverResult dtxrr = qpidSession.dtxRecover(true); - - // status if we can't process the xids - status = XAER_RMERR; - - std::vector wireFormatXids(dtxrr.getInDoubt().size()); - std::transform(dtxrr.getInDoubt().begin(), dtxrr.getInDoubt().end(), wireFormatXids.begin(), Array::get); - - size_t nXids = wireFormatXids.size(); - - if (nXids > 0) { - StructHelper decoder; - Xid qpidXid; - for (size_t i = 0; i < nXids; i++) { - decoder.decode (qpidXid, wireFormatXids[i]); - inDoubtXids.push_back(qpidXid); - } - - // if we got here the decoder validated the Xids - status = XA_OK; - - // make sure none are too big, just in case - - for (size_t i = 0; i < nXids; i++) { - Xid& xid = inDoubtXids[i]; - size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0; - size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0; - if ((l1 > MAXGTRIDSIZE) || (l2 > MAXBQUALSIZE) || - ((l1 + l2) > XIDDATASIZE)) { - status = XAER_RMERR; - break; - } - } - } - else { - // nXids == 0, the previously cleared inDoubtXids is correctly populated - status = XA_OK; - } - - if (status == XA_OK) - cursor = 0; - } catch (...) { - // TODO: log it - } - - EnterCriticalSection(&rmLock); - active = false; - } - else { - // TMSTARTRSCAN not set, is there an existing scan to work from? - if (cursor == -1) - return XAER_INVAL; - } - - if (status != XA_OK) - return status; - - INT actualCount = count; - if (count > 0) { - int nAvailable = (int) inDoubtXids.size() - cursor; - if (nAvailable < count) - actualCount = nAvailable; - - for (int i = 0; i < actualCount; i++) { - Xid& qpidXid = inDoubtXids[i + cursor]; - QpidToXa(qpidXid, xids[i]); - } - } - - if (flags & TMENDRSCAN) { - cursor = -1; - inDoubtXids.clear(); - } - - return actualCount; -} - - -// Call with lock held - -ResourceManager* findRm(int rmid) { - if (rmMap.find(rmid) == rmMap.end()) { - return NULL; - } - return rmMap[rmid]; -} - - -INT __cdecl xa_open (char *xa_info, int rmid, long flags) { - if (flags & TMASYNC) - return XAER_ASYNC; - - INT rv = XAER_RMERR; - EnterCriticalSection(&rmLock); - - ResourceManager* rmp = findRm(rmid); - if (rmp != NULL) { - // error: already in use - rv = XAER_PROTO; - } - else { - std::string brokerHost; - int brokerPort; - std::string username; - std::string password; - bool ssl; - bool saslPlain; - - if (parseDsn(xa_info, brokerHost, brokerPort, ssl, saslPlain, username, password)) { - - try { - rmp = new ResourceManager(rmid, brokerHost, brokerPort, ssl, saslPlain, username, password); - - rv = rmp->open(); - if (rv != XA_OK) { - delete (rmp); - } - else { - rmMap[rmid] = rmp; - } - } catch (...) {} - } - else { - rv = XAER_INVAL; - } - } - - LeaveCriticalSection(&rmLock); - return rv; -} - - -INT __cdecl xa_close (char *xa_info, int rmid, long flags) { - if (flags & TMASYNC) - return XAER_ASYNC; - - INT rv = XAER_RMERR; - - EnterCriticalSection(&rmLock); - ResourceManager* rmp = findRm(rmid); - - if (rmp == NULL) { - // can close multiple times - rv = XA_OK; - } - else { - rv = rmp->close(); - rmMap.erase(rmid); - try { - delete (rmp); - } catch (...) { - // TODO: log it - } - } - - LeaveCriticalSection(&rmLock); - return rv; -} - - -INT __cdecl xa_commit (XID *xid, int rmid, long flags) { - if (flags & TMASYNC) - return XAER_ASYNC; - - INT rv = XAER_RMFAIL; - - EnterCriticalSection(&rmLock); - ResourceManager* rmp = findRm(rmid); - - if (rmp == NULL) { - rv = XAER_INVAL; - } - else { - rv = rmp->commit(xid); - } - - LeaveCriticalSection(&rmLock); - return rv; -} - - -INT __cdecl xa_rollback (XID *xid, int rmid, long flags) { - if (flags & TMASYNC) - return XAER_ASYNC; - - INT rv = XAER_RMFAIL; - - EnterCriticalSection(&rmLock); - ResourceManager* rmp = findRm(rmid); - - if (rmp == NULL) { - rv = XAER_INVAL; - } - else { - rv = rmp->rollback(xid); - } - - LeaveCriticalSection(&rmLock); - return rv; -} - - -INT __cdecl xa_recover (XID *xids, long count, int rmid, long flags) { - INT rv = XAER_RMFAIL; - - EnterCriticalSection(&rmLock); - ResourceManager* rmp = findRm(rmid); - - if (rmp == NULL) { - rv = XAER_PROTO; - } - else { - rv = rmp->recover(xids, count, flags); - } - - LeaveCriticalSection(&rmLock); - return rv; -} - - -INT __cdecl xa_start (XID *xid, int rmid, long flags) { - // not used in recovery - return XAER_PROTO; -} - - -INT __cdecl xa_end (XID *xid, int rmid, long flags) { - // not used in recovery - return XAER_PROTO; -} - - -INT __cdecl xa_prepare (XID *xid, int rmid, long flags) { - // not used in recovery - return XAER_PROTO; -} - - -INT __cdecl xa_forget (XID *xid, int rmid, long flags) { - // not used in recovery - return XAER_PROTO; -} - - -INT __cdecl xa_complete (int *handle, int *retval, int rmid, long flags) { - // not used in recovery - return XAER_PROTO; -} - - - -xa_switch_t xaSwitch; - -HRESULT __cdecl GetQpidXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch) -{ - // needed for now due to implicit use of FreeLibrary in WSACleanup() in qpid/cpp/src/qpid/sys/windows/Socket.cpp - pinDll(); - - if (xaSwitch.xa_open_entry != xa_open) { - - xaSwitch.xa_open_entry = xa_open; - xaSwitch.xa_close_entry = xa_close; - xaSwitch.xa_start_entry = xa_start; - xaSwitch.xa_end_entry = xa_end; - xaSwitch.xa_prepare_entry = xa_prepare; - xaSwitch.xa_commit_entry = xa_commit; - xaSwitch.xa_rollback_entry = xa_rollback; - xaSwitch.xa_recover_entry = xa_recover; - xaSwitch.xa_forget_entry = xa_forget; - xaSwitch.xa_complete_entry = xa_complete; - - strcpy_s(xaSwitch.name, RMNAMESZ, "qpidxarm"); - xaSwitch.flags = TMNOMIGRATE; - xaSwitch.version = 0; - } - *ppXaSwitch = &xaSwitch; - return S_OK; -} - - - - -}}} // namespace Apache::Qpid::DtcPlugin - - -// GetXaSwitch - -extern "C" { - - __declspec(dllexport) HRESULT __cdecl GetXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch) - { - return Apache::Qpid::DtcPlugin::GetQpidXaSwitch (XaSwitchFlags, ppXaSwitch); - } -} - - -// dllmain - -BOOL APIENTRY DllMain( HMODULE hModule, - DWORD ul_reason_for_call, - LPVOID lpReserved) -{ - - switch (ul_reason_for_call) - { - case DLL_PROCESS_ATTACH: - InitializeCriticalSection(&Apache::Qpid::DtcPlugin::rmLock); - Apache::Qpid::DtcPlugin::thisDll = hModule; - break; - - case DLL_PROCESS_DETACH: - DeleteCriticalSection(&Apache::Qpid::DtcPlugin::rmLock); - break; - - case DLL_THREAD_ATTACH: - case DLL_THREAD_DETACH: - break; - } - return TRUE; -} - diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp deleted file mode 100644 index 1bc9a15d92..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp +++ /dev/null @@ -1,276 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/framing/FrameSet.h" - -#include "AmqpConnection.h" -#include "AmqpSession.h" -#include "QpidMarshal.h" -#include "QpidException.h" -#include "DtxResourceManager.h" -#include "XaTransaction.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace msclr; - -using namespace qpid::client; -using namespace std; - - -// Note on locks: Use thisLock for fast counting and idle/busy -// notifications. Use the "sessions" list to serialize session -// creation/reaping and overall tear down. - - -AmqpConnection::AmqpConnection(String^ server, int port) : - connectionp(NULL), - busyCount(0), - disposed(false) -{ - initialize (server, port, false, false, nullptr, nullptr); -} - -AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) : - connectionp(NULL), - busyCount(0), - disposed(false) -{ - initialize (server, port, ssl, saslPlain, username, password); -} - -void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) -{ - if (server == nullptr) - throw gcnew ArgumentNullException("AMQP server"); - if (saslPlain) { - if (username == nullptr) - throw gcnew ArgumentNullException("username"); - if (username == nullptr) - throw gcnew ArgumentNullException("password"); - } - - bool success = false; - System::Exception^ openException = nullptr; - sessions = gcnew Collections::Generic::List(); - thisLock = gcnew Object(); - - try { - connectionp = new Connection; - - if (ssl || saslPlain) { - ConnectionSettings proposedSettings; - proposedSettings.host = QpidMarshal::ToNative(server); - proposedSettings.port = port; - if (ssl) - proposedSettings.protocol = "ssl"; - - if (saslPlain) { - proposedSettings.username = QpidMarshal::ToNative(username); - proposedSettings.password = QpidMarshal::ToNative(password); - proposedSettings.mechanism = "PLAIN"; - } - - connectionp->open (proposedSettings); - } - else { - connectionp->open (QpidMarshal::ToNative(server), port); - } - - // TODO: registerFailureCallback for failover - success = true; - const ConnectionSettings& settings = connectionp->getNegotiatedSettings(); - this->maxFrameSize = settings.maxFrameSize; - this->host = server; - this->port = port; - this->ssl = ssl; - this->saslPlain = saslPlain; - this->username = username; - this->password = password; - this->isOpen = true; - } catch (const qpid::Exception& error) { - String^ errmsg = gcnew String(error.what()); - openException = gcnew QpidException(errmsg); - } finally { - if (!success) { - Cleanup(); - if (openException == nullptr) { - openException = gcnew QpidException ("unknown connection failure"); - } - throw openException; - } - } -} - -AmqpConnection^ AmqpConnection::Clone() { - if (disposed) - throw gcnew ObjectDisposedException("AmqpConnection.Clone"); - return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password); -} - -void AmqpConnection::Cleanup() -{ - { - lock l(sessions); - if (disposed) - return; - disposed = true; - } - - try { - // let the child sessions clean up - - for each(AmqpSession^ s in sessions) { - s->ConnectionClosed(); - } - } - finally - { - if (connectionp != NULL) { - isOpen = false; - connectionp->close(); - delete connectionp; - connectionp = NULL; - } - } -} - -AmqpConnection::~AmqpConnection() -{ - Cleanup(); -} - -AmqpConnection::!AmqpConnection() -{ - Cleanup(); -} - -void AmqpConnection::Close() -{ - // Simulate Dispose()... - Cleanup(); - GC::SuppressFinalize(this); -} - -AmqpSession^ AmqpConnection::CreateSession() -{ - lock l(sessions); - if (disposed) { - throw gcnew ObjectDisposedException("AmqpConnection"); - } - AmqpSession^ session = gcnew AmqpSession(this, connectionp); - sessions->Add(session); - return session; -} - -// called whenever a child session becomes newly busy (a first reader or writer since last idle) - -void AmqpConnection::NotifyBusy() -{ - bool changed = false; - { - lock l(thisLock); - if (busyCount++ == 0) - changed = true; - } -} - -// called whenever a child session becomes newly idle (a last reader or writer has closed) -// The connection is idle when none of its child sessions are busy - -void AmqpConnection::NotifyIdle() -{ - bool connectionIdle = false; - { - lock l(thisLock); - if (--busyCount == 0) - connectionIdle = true; - } - if (connectionIdle) { - OnConnectionIdle(this, System::EventArgs::Empty); - } -} - -void HexAppend(StringBuilder^ sb, String^ s) { - if (s->Length > 0) { - array^ bytes = Encoding::UTF8->GetBytes(s); - for each (unsigned char b in bytes) { - sb->Append(String::Format("{0:x2}", b)); - } - } - sb->Append("."); -} - - -// Note: any change to this format has to be reflected in the DTC plugin's xa_open() -// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password" -// This extended info is needed so that the DTC can make a separate connection to the broker -// for recovery. - -String^ AmqpConnection::DataSourceName::get() { - if (dataSourceName == nullptr) { - StringBuilder^ sb = gcnew StringBuilder(); - sb->Append("QPIDdsnV2."); - - sb->Append(this->port); - sb->Append("."); - - HexAppend(sb, this->host); - - sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id); - sb->Append("-"); - sb->Append(AppDomain::CurrentDomain->Id); - sb->Append("."); - - if (this->ssl) - sb->Append("T"); - else - sb->Append("F"); - sb->Append("."); - - if (this->saslPlain) { - sb->Append("P."); - HexAppend(sb, this->username); - HexAppend(sb, this->password); - } - else { - // SASL anonymous - sb->Append("A."); - } - - dataSourceName = sb->ToString(); - } - return dataSourceName; -} - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h deleted file mode 100644 index ef4d0e3f37..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h +++ /dev/null @@ -1,97 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace std; -using namespace qpid::client; - -ref class AmqpSession; -ref class DtxResourceManager; - -public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs); - -public ref class AmqpConnection -{ -private: - Connection* connectionp; - bool disposed; - Collections::Generic::List^ sessions; - bool isOpen; - int busyCount; - int maxFrameSize; - DtxResourceManager^ dtxResourceManager; - // unique string used for distributed transactions - String^ dataSourceName; - Object ^thisLock; - - // properties needed to allow DTC to do transactions (see DataSourceName - String^ host; - int port; - bool ssl; - bool saslPlain; - String^ username; - String^ password; - - void Cleanup(); - void initialize (System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password); - - internal: - void NotifyBusy(); - void NotifyIdle(); - AmqpConnection^ Clone(); - - property int MaxFrameSize { - int get () { return maxFrameSize; } - } - - property DtxResourceManager^ CachedResourceManager { - DtxResourceManager^ get () { return dtxResourceManager; } - void set (DtxResourceManager^ value) { dtxResourceManager = value; } - } - - property String^ DataSourceName { - String^ get(); - } - -public: - AmqpConnection(System::String^ server, int port); - AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password); - ~AmqpConnection(); - !AmqpConnection(); - void Close(); - AmqpSession^ CreateSession(); - event ConnectionIdleEventHandler^ OnConnectionIdle; - - property bool IsOpen { - bool get() { return isOpen; } - }; - - property bool IsIdle { - bool get() { return (busyCount == 0); } - } -}; - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp deleted file mode 100644 index 5c333aff60..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/AMQFrame.h" - -#include "MessageBodyStream.h" -#include "AmqpMessage.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace msclr; - -using namespace Apache::Qpid::AmqpTypes; - -AmqpMessage::AmqpMessage(MessageBodyStream ^mbs) : - messageBodyStream(mbs), - disposed(false) -{ -} - -void AmqpMessage::Cleanup() -{ - { - lock l(this); - if (disposed) - return; - - disposed = true; - } - - messageBodyStream->Close(); -} - -AmqpMessage::~AmqpMessage() -{ - Cleanup(); -} - -AmqpMessage::!AmqpMessage() -{ - Cleanup(); -} - -void AmqpMessage::Close() -{ - // Simulate Dispose()... - Cleanup(); - GC::SuppressFinalize(this); -} - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h deleted file mode 100644 index f0801d30dc..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h +++ /dev/null @@ -1,61 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; - -using namespace qpid::client; -using namespace std; - - - -public ref class AmqpMessage -{ -private: - MessageBodyStream^ messageBodyStream; - AmqpTypes::AmqpProperties^ amqpProperties; - bool disposed; - void Cleanup(); - -internal: - AmqpMessage(MessageBodyStream ^bstream); - -public: - ~AmqpMessage(); - !AmqpMessage(); - void Close(); - - property AmqpTypes::AmqpProperties^ Properties { - AmqpTypes::AmqpProperties^ get () { return amqpProperties; } - void set(AmqpTypes::AmqpProperties^ p) { amqpProperties = p; } - } - - property System::IO::Stream^ BodyStream { - System::IO::Stream^ get() { return messageBodyStream; } - } -}; - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp deleted file mode 100644 index ac7c777d1f..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp +++ /dev/null @@ -1,633 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/Message.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/client/Future.h" -#include "qpid/framing/Xid.h" - -#include "AmqpConnection.h" -#include "AmqpSession.h" -#include "AmqpMessage.h" -#include "MessageBodyStream.h" -#include "InputLink.h" -#include "OutputLink.h" -#include "QpidMarshal.h" -#include "QpidException.h" -#include "XaTransaction.h" -#include "DtxResourceManager.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Transactions; -using namespace msclr; - -using namespace qpid::client; -using namespace std; - - -AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidConnectionp) : - connection(conn), - sessionp(NULL), - sessionImplp(NULL), - subs_mgrp(NULL), - helperRunning(false), - openCount(0), - syncCount(0), - closing(false), - dtxEnabled(false) -{ - bool success = false; - try { - sessionp = new qpid::client::AsyncSession; - *sessionp = qpidConnectionp->newSession(); - subs_mgrp = new SubscriptionManager (*sessionp); - waiters = gcnew Collections::Generic::List(); - sessionLock = waiters; // waiters convenient and not publicly visible - openCloseLock = gcnew Object(); - success = true; - } finally { - if (!success) { - Cleanup(); - // TODO: include inner exception information - throw gcnew QpidException ("session creation failure"); - } - } -} - - -void AmqpSession::Cleanup() -{ - bool connected = connection->IsOpen; - - if (subs_mgrp != NULL) { - if (connected) - subs_mgrp->stop(); - delete subs_mgrp; - subs_mgrp = NULL; - } - - if (sessionp != NULL) { - if (connected) { - sessionp->close(); - } - delete sessionp; - sessionp = NULL; - sessionImplp = NULL; - } -} - - -static qpid::framing::Xid& getXid(XaTransaction^ xaTx) -{ - return *((qpid::framing::Xid *)xaTx->XidHandle.ToPointer()); -} - - -void AmqpSession::CheckOpen() -{ - if (closing) - throw gcnew ObjectDisposedException("AmqpSession"); -} - - -// Called by the parent AmqpConnection - -void AmqpSession::ConnectionClosed() -{ - lock l(sessionLock); - - if (closing) - return; - - closing = true; - - if (connection->IsOpen) { - // send closing handshakes... - - if (dtxEnabled) { - // session may close before all its transactions complete, at least force the phase 0 flush - if (pendingTransactions->Count > 0) { - array^ txArray = pendingTransactions->ToArray(); - l.release(); - for each (XaTransaction^ xaTx in txArray) { - //xaTx->SessionClosing(this); - xaTx->WaitForCompletion(); - } - l.acquire(); - } - } - - WaitLastSync (%l); - // Assert pendingTransactions->Count == 0 - - if (openXaTransaction != nullptr) { - // send final dtxend - sessionp->dtxEnd(getXid(openXaTransaction), false, true, false); - openXaTransaction = nullptr; - openSystemTransaction = nullptr; - // this operation will complete by the time Cleanup() returns - } - } - - Cleanup(); -} - -InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue) -{ - return CreateInputLink(sourceQueue, true, false, nullptr, nullptr); -} - -InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, - System::String^ filterKey, System::String^ exchange) -{ - lock ocl(openCloseLock); - lock l(sessionLock); - CheckOpen(); - - InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange); - { - if (openCount == 0) { - l.release(); - connection->NotifyBusy(); - } - openCount++; - } - return link; -} - -OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue) -{ - lock ocl(openCloseLock); - lock l(sessionLock); - CheckOpen(); - - OutputLink^ link = gcnew OutputLink (this, targetQueue); - - if (sessionImplp == NULL) { - // not needed unless sending messages - SessionBase_0_10Access sa(*sessionp); - boost::shared_ptr sip = sa.get(); - sessionImplp = sip.get(); - } - - if (openCount == 0) { - l.release(); - connection->NotifyBusy(); - } - openCount++; - - return link; -} - - -// called whenever a child InputLink or OutputLink is closed or finalized -void AmqpSession::NotifyClosed() -{ - lock ocl(openCloseLock); - openCount--; - if (openCount == 0) { - connection->NotifyIdle(); - } -} - - -CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state) -{ - lock l(sessionLock); - - // delimit with session dtx commands depending on the transaction context - UpdateTransactionState(%l); - - CheckOpen(); - - bool syncPending = false; - - // create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream - - std::string exname = QpidMarshal::ToNative(queue); - FrameSet *framesetp = (FrameSet *) mbody->GetFrameSet().ToPointer(); - uint8_t acceptMode=1; - uint8_t acquireMode=0; - MessageTransferBody mtcmd(ProtocolVersion(0,10), exname, acceptMode, acquireMode); - // ask for a command completion - mtcmd.setSync(true); - - //send it - - Future *futurep = NULL; - try { - futurep = new Future(sessionImplp->send(mtcmd, *framesetp)); - - CompletionWaiter^ waiter = nullptr; - if (async || (timeout != TimeSpan::MaxValue)) { - waiter = gcnew CompletionWaiter(this, timeout, (IntPtr) futurep, callback, state); - // waiter is responsible for releasing the Future native resource - futurep = NULL; - addWaiter(waiter); - return waiter; - } - - // synchronous send with no timeout: no need to involve the asyncHelper thread - - IncrementSyncs(); - syncPending = true; - l.release(); - internalWaitForCompletion((IntPtr) futurep); - } - finally { - if (syncPending) { - if (!l.is_locked()) - l.acquire(); - DecrementSyncs(); - } - if (futurep != NULL) - delete (futurep); - } - return nullptr; -} - - -void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey) -{ - lock l(sessionLock); - CheckOpen(); - - sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue), - arg::exchange=QpidMarshal::ToNative(exchange), - arg::bindingKey=QpidMarshal::ToNative(filterKey)); - -} - - -void AmqpSession::internalWaitForCompletion(IntPtr fp) -{ - Debug::Assert(syncCount > 0, "sync counter mismatch"); - - // Qpid native lib call to wait for the command completion - ((Future *)fp.ToPointer())->wait(*sessionImplp); -} - -// call with lock held -void AmqpSession::addWaiter(CompletionWaiter^ waiter) -{ - IncrementSyncs(); - waiters->Add(waiter); - if (!helperRunning) { - helperRunning = true; - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &AmqpSession::asyncHelper)); - } -} - - -void AmqpSession::removeWaiter(CompletionWaiter^ waiter) -{ - // a waiter can be removed from anywhere in the list if timed out - - lock l(sessionLock); - int idx = waiters->IndexOf(waiter); - if (idx == -1) { - // TODO: assert or log - } - else { - waiters->RemoveAt(idx); - DecrementSyncs(); - } -} - - -// process CompletionWaiter list one at a time. - -void AmqpSession::asyncHelper(Object ^unused) -{ - lock l(sessionLock); - - while (true) { - if (waiters->Count == 0) { - helperRunning = false; - return; - } - - CompletionWaiter^ waiter = waiters[0]; - l.release(); - // can block, but for short time - // the waiter removes itself from the list, possibly as the timer thread on timeout - waiter->Run(); - l.acquire(); - } -} - -bool AmqpSession::MessageStop(std::string &name) -{ - lock l(sessionLock); - - if (closing) - return false; - - sessionp->messageStop(name, true); - return true; -} - -void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing) -{ - lock l(sessionLock); - - if (!browsing) { - // delimit with session dtx commands depending on the transaction context - UpdateTransactionState(%l); - } - - CheckOpen(); - - sessionp->markCompleted(transfers, false); - if (!browsing) - sessionp->messageAccept(transfers, false); -} - - -// call with session lock held - -void AmqpSession::UpdateTransactionState(lock^ slock) -{ - Transaction^ currentTx = Transaction::Current; - if ((currentTx == nullptr) && !dtxEnabled) { - // no transaction scope and no previous dtx work to monitor - return; - } - - if (currentTx == openSystemTransaction) { - // no change - return; - } - - if (!dtxEnabled) { - // AMQP requires that this be the first dtx-related command on the session - sessionp->dtxSelect(false); - dtxEnabled = true; - pendingTransactions = gcnew Collections::Generic::List(); - } - - bool notify = false; // unless the System.Transaction is no longer active - XaTransaction^ oldXaTx = openXaTransaction; - if (openSystemTransaction != nullptr) { - // The application may start a new transaction before the phase0 on rollback - try { - if (openSystemTransaction->TransactionInformation->Status != TransactionStatus::Active) { - notify = true; - } - } catch (System::ObjectDisposedException^) { - notify = true; - } - } - - slock->release(); - // only use stack variables until lock re-acquired - - if (notify) { - // will do call back to all enlisted sessions. call with session lock released. - // If NotifyPhase0() wins the race to start phase 0, openXaTransaction will be null - oldXaTx->NotifyPhase0(); - } - - XaTransaction^ newXaTx = nullptr; - if (currentTx != nullptr) { - // This must be called with locks released. The DTC and System.Transactions methods that - // will be called hold locks that interfere with the ITransactionResourceAsync callbacks. - newXaTx = DtxResourceManager::GetXaTransaction(this, currentTx); - } - - slock->acquire(); - - if (closing) - return; - - if (openSystemTransaction != nullptr) { - // some other transaction has the dtx window open - // close the XID window, suspend = true... in case it is used again - sessionp->dtxEnd(getXid(openXaTransaction), false, true, false); - openSystemTransaction = nullptr; - openXaTransaction = nullptr; - } - - - // Call enlist with session lock held. The XaTransaction will call DtxStart before returning. - if (newXaTx != nullptr) { - if (!pendingTransactions->Contains(newXaTx)) { - pendingTransactions->Add(newXaTx); - } - - newXaTx->Enlist(this); - } - - openXaTransaction = newXaTx; - openSystemTransaction = currentTx; -} - - -typedef TypedResult XaResultCompletion; - - -// send the required closing dtx.End before Phase 1 - -IntPtr AmqpSession::BeginPhase0Flush(XaTransaction ^xaTx) { - - lock l(sessionLock); - IntPtr completionp = IntPtr::Zero; - try { - if (sessionp != NULL) { - - // proceed even if "closing == true", the phase 0 is part of the transition from closing to closed - - if (xaTx != openXaTransaction) { - // a different transaction (or none) is in scope, so xaTx was previously suspended. - // must re-open it to close it properly - if (openXaTransaction != nullptr) { - // suspend the session's current pending transaction - // it wil be reopened in a future enlistment or phase 0 flush. - sessionp->dtxEnd(getXid(openXaTransaction), false, true, false); - } - // resuming - sessionp->dtxStart(getXid(xaTx), false, true, false); - } - - // the closing (i.e. non-suspended) dtxEnd happens here (exactly once for a given transaction) - // set the sync bit since phase0 is a precondition to prepare or abort - completionp = (IntPtr) new XaResultCompletion(sessionp->dtxEnd(getXid(xaTx), false, false, true)); - IncrementSyncs(); - } - } - catch (System::Exception^ ) { - // all the caller wants to know is if completionp is non-null - } - - openXaTransaction = nullptr; - openSystemTransaction = nullptr; - return completionp; -} - - -void AmqpSession::EndPhase0Flush(XaTransaction ^xaTx, IntPtr intptr) { - XaResultCompletion *completionp = (XaResultCompletion *) intptr.ToPointer(); - lock l(sessionLock); - - if (completionp != NULL) { - try { - l.release(); - completionp->wait(); - pendingTransactions->Remove(xaTx); - } - catch (System::Exception^) { - // connection closed or network drop - } - finally { - l.acquire(); - DecrementSyncs(); - delete completionp; - } - } -} - - -IntPtr AmqpSession::DtxStart(IntPtr ip, bool join, bool resume) { - // called with session lock held (as a callback from the Enlist()) - // The XaTransaction knows if this should be the originating dtxStart, or a join/resume - IntPtr rv = IntPtr::Zero; - qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); - if (join || resume) { - sessionp->dtxStart(*xidp, join, resume, false); - } - else { - // The XaTransaction needs to track when the first dtxStart completes to safely request a join - IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs - rv = (IntPtr) new XaResultCompletion(sessionp->dtxStart(*xidp, join, resume, false)); - } - - return rv; -} - - -IntPtr AmqpSession::DtxPrepare(IntPtr ip) { - qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); - lock l(sessionLock); - - if (closing) - return IntPtr::Zero; - - IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs - return (IntPtr) new XaResultCompletion(sessionp->dtxPrepare(*xidp, true)); -} - - -IntPtr AmqpSession::DtxCommit(IntPtr ip, bool onePhase) { - qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); - lock l(sessionLock); - - if (closing) - return IntPtr::Zero; - - IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs - return (IntPtr) new XaResultCompletion(sessionp->dtxCommit(*xidp, onePhase, true)); -} - - -IntPtr AmqpSession::DtxRollback(IntPtr ip) { - qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); - lock l(sessionLock); - if (closing) - return IntPtr::Zero; - - IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs - - return (IntPtr) new XaResultCompletion(sessionp->dtxRollback(*xidp, true)); -} - - -//call with lock held -void AmqpSession::IncrementSyncs() { - syncCount++; -} - - -//call with lock held -void AmqpSession::DecrementSyncs() { - syncCount--; - Debug::Assert(syncCount >= 0, "sync counter underrun"); - if (syncCount == 0) { - if (closeWaitHandle != nullptr) { - // now OK to move from closing to closed - closeWaitHandle->Set(); - } - } -} - - -// call with lock held -void AmqpSession::WaitLastSync(lock ^l) { - if (syncCount == 0) - return; - if (AppDomain::CurrentDomain->IsFinalizingForUnload()) { - // a wait would be a hang. No more syncs coming - return; - } - if (closeWaitHandle == nullptr) - closeWaitHandle = gcnew ManualResetEvent(false); - l->release(); - closeWaitHandle->WaitOne(); - l->acquire(); -} - - -void AmqpSession::ReleaseCompletion(IntPtr completion) { - lock l(sessionLock); - DecrementSyncs(); - delete completion.ToPointer(); -} - - -// Non-exclusive borrowing for a "brief" period. I.e. several synced -// commands (address resolution) - -IntPtr AmqpSession::BorrowNativeSession() { - lock l(sessionLock); - if (closing) - return IntPtr::Zero; - - IncrementSyncs(); - return (IntPtr) sessionp; -} - -void AmqpSession::ReturnNativeSession() { - lock l(sessionLock); - DecrementSyncs(); -} - -}}} // namespace Apache::Qpid::Cli diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h deleted file mode 100644 index 7a49496805..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h +++ /dev/null @@ -1,109 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -#include "AmqpConnection.h" -#include "MessageBodyStream.h" -#include "CompletionWaiter.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Transactions; -using namespace System::Diagnostics; - - -using namespace qpid::client; -using namespace std; - -ref class InputLink; -ref class OutputLink; -ref class XaTransaction; - -public ref class AmqpSession -{ -private: - Object^ sessionLock; - Object^ openCloseLock; - AmqpConnection^ connection; - AsyncSession* sessionp; - SessionImpl* sessionImplp; - SubscriptionManager* subs_mgrp; - Collections::Generic::List^ waiters; - bool helperRunning; - - // number of active InputLinks and OutputLinks - int openCount; - - // the number of async commands sent to the broker that need completion confirmation - int syncCount; - - bool closing; - ManualResetEvent^ closeWaitHandle; - bool dtxEnabled; - Transaction^ openSystemTransaction; - XaTransaction^ openXaTransaction; - Collections::Generic::List^ pendingTransactions; - - void Cleanup(); - void CheckOpen(); - void asyncHelper(Object ^); - void addWaiter(CompletionWaiter^ waiter); - void UpdateTransactionState(msclr::lock^ sessionLock); - void IncrementSyncs(); - void DecrementSyncs(); - void WaitLastSync(msclr::lock^ l); - -public: - OutputLink^ CreateOutputLink(System::String^ targetQueue); - InputLink^ CreateInputLink(System::String^ sourceQueue); - - // 0-10 specific support; deprecated in favor of Qpid messaging addresses - InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange); - void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey); - -internal: - AmqpSession(AmqpConnection^ connection, qpid::client::Connection* qpidConnection); - void NotifyClosed(); - CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state); - void ConnectionClosed(); - void internalWaitForCompletion(IntPtr future); - void removeWaiter(CompletionWaiter^ waiter); - bool MessageStop(std::string &name); - void AcceptAndComplete(SequenceSet& transfers, bool browsing); - IntPtr BeginPhase0Flush(XaTransaction^); - void EndPhase0Flush(XaTransaction^, IntPtr); - IntPtr DtxStart(IntPtr xidp, bool, bool); - IntPtr DtxPrepare(IntPtr xidp); - IntPtr DtxCommit(IntPtr xidp, bool onePhase); - IntPtr DtxRollback(IntPtr xidp); - void ReleaseCompletion(IntPtr completion); - IntPtr BorrowNativeSession(); - void ReturnNativeSession(); - - property AmqpConnection^ Connection { - AmqpConnection^ get () { return connection; } - } -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp deleted file mode 100644 index 91c23ae30a..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -using namespace System; -using namespace System::Reflection; -using namespace System::Runtime::CompilerServices; -using namespace System::Runtime::InteropServices; -using namespace System::Security::Permissions; - -// -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -// -[assembly:AssemblyTitleAttribute("Apache.Qpid.Interop")]; -[assembly:AssemblyDescriptionAttribute("")]; -[assembly:AssemblyConfigurationAttribute("")]; -[assembly:AssemblyCompanyAttribute("")]; -[assembly:AssemblyProductAttribute("")]; -[assembly:AssemblyCopyrightAttribute("")]; -[assembly:AssemblyTrademarkAttribute("")]; -[assembly:AssemblyCultureAttribute("")]; - -// -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the value or you can default the Revision and Build Numbers -// by using the '*' as shown below: - -[assembly:AssemblyVersionAttribute("1.0.*")]; - -[assembly:ComVisible(false)]; - -[assembly:CLSCompliantAttribute(true)]; - -[assembly:SecurityPermission(SecurityAction::RequestMinimum, UnmanagedCode = true)]; diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp deleted file mode 100644 index e39ee1b1ae..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp +++ /dev/null @@ -1,145 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Demux.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" - -#include "MessageBodyStream.h" -#include "AmqpMessage.h" -#include "AmqpSession.h" -#include "InputLink.h" -#include "CompletionWaiter.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace msclr; - -// A class to provide IAsyncResult semantics for a qpid AsyncSession command (i.e. 0-10 messageTransfer) -// when the client session receives a "Completion" notification from the Broker. - - -CompletionWaiter::CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback^ callback, Object^ state) -{ - this->qpidFuture = future; - this->asyncCallback = callback; - this->state = state; - this->parent = parent; - this->thisLock = gcnew Object(); - // do this after the Completion Waiter is fully initialized, in case of - // very small timespan - if (timeSpan != TimeSpan::MaxValue) { - this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); - } -} - - -void CompletionWaiter::WaitForCompletion() -{ - if (isCompleted) - return; - - lock l(thisLock); - while (!isCompleted) { - Monitor::Wait(thisLock); - } -} - -void CompletionWaiter::Run() -{ - // no locks required in this method - if (isCompleted) - return; - - try { - // Wait for the arrival of the "AMQP Completion" indication from the Broker - parent->internalWaitForCompletion(qpidFuture); - } - catch (System::Exception^ e) { - runException = e; - } - finally { - delete(qpidFuture.ToPointer()); - qpidFuture = (IntPtr) NULL; - } - - if (timer != nullptr) { - timer->~Timer(); - timer = nullptr; - } - - Complete(false); -} - - -// "Complete" here means complete the AsyncResult, which may precede broker "command completion" if timed out - -void CompletionWaiter::Complete(bool isTimerThread) -{ - lock l(thisLock); - if (isCompleted) - return; - - isCompleted = true; - if (isTimerThread) - timedOut = true; - - Monitor::PulseAll(thisLock); - - // do this check and signal while locked - if (asyncWaitHandle != nullptr) - asyncWaitHandle->Set(); - - l.release(); - - parent->removeWaiter(this); - - if (asyncCallback != nullptr) { - // guard against application callback exception - try { - asyncCallback(this); - } - catch (System::Exception^) { - // log it? - } - } -} - - -void CompletionWaiter::TimeoutCallback(Object^ state) -{ - CompletionWaiter^ waiter = (CompletionWaiter^) state; - waiter->Complete(true); -} - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h deleted file mode 100644 index 88880c3721..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h +++ /dev/null @@ -1,98 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; - -public ref class CompletionWaiter : IAsyncResult -{ -private: - bool timedOut; - // has an owner thread - bool assigned; - System::Exception^ runException; - AsyncCallback^ asyncCallback; - Threading::Timer ^timer; - bool isCompleted; - Object^ state; - Object^ thisLock; - ManualResetEvent^ asyncWaitHandle; - AmqpSession^ parent; - IntPtr qpidFuture; - void Complete(bool isTimerThread); - static void TimeoutCallback(Object^ state); - static TimerCallback^ timeoutCallback = gcnew TimerCallback(CompletionWaiter::TimeoutCallback); - - internal: - CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback ^callback, Object^ state); - - void Run(); - void WaitForCompletion(); - - property bool Assigned { - bool get () { return assigned; } - } - - property bool TimedOut { - bool get () { return timedOut; } - } - - - public: - - virtual property bool IsCompleted { - bool get () { return isCompleted; } - } - - virtual property bool CompletedSynchronously { - bool get () { return false; } - } - - virtual property WaitHandle^ AsyncWaitHandle { - WaitHandle^ get () { - if (asyncWaitHandle != nullptr) { - return asyncWaitHandle; - } - - msclr::lock l(thisLock); - if (asyncWaitHandle == nullptr) { - asyncWaitHandle = gcnew ManualResetEvent(isCompleted); - } - return asyncWaitHandle; - } - } - - - virtual property Object^ AsyncState { - Object^ get () { return state; } - } - - - - -}; - -}}} // namespace Apache::Qpid::Interop - diff --git a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp deleted file mode 100644 index 6ea31f8401..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp +++ /dev/null @@ -1,285 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/framing/FrameSet.h" - -#include "AmqpConnection.h" -#include "AmqpSession.h" -#include "DtxResourceManager.h" -#include "XaTransaction.h" -#include "QpidException.h" -#include "QpidMarshal.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Transactions; -using namespace msclr; - - -/* - * There is one DtxResourceManager per broker and per application process. - * - * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist - * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The - * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The - * XaTransaction object works through the lifecycle of the Transaction, including prompting the - * enlisted sessions to send their delimiting dtxEnd commands. - * - * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named - * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be - * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe. - * - * To enable transaction support: - * DTC requires a registry key to find the plugin - * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll] - * DTC needs to be configured for XA - * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions - * - */ - -// TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections. -// But note that a new RM registration with the DTC is very expensive. - - -DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) { - dtcComp = NULL; - xaHelperp = NULL; - rmCookie = 0; - doubtCount = 0; - tmDown = false; - AmqpConnection^ clonedCon = appConnection->Clone(); - dtxControlSession = clonedCon->CreateSession(); - dataSourceName = clonedCon->DataSourceName; - transactionMap = gcnew Collections::Generic::Dictionary(); - - HRESULT hr; - - try { - // instead of pinning this instance, just use tmp stack variables for small stuff - IUnknown* tmp = NULL; - // request the default DTC - hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp); - if (hr != S_OK) - throw gcnew QpidException("connection failure to DTC service"); - dtcComp = tmp; - - IDtcToXaHelperSinglePipe *tmp2 = NULL; - hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2); - if (hr != S_OK) - throw gcnew QpidException("DTC XA unavailable"); - xaHelperp = tmp2; - - std::string native_dsn = QpidMarshal::ToNative(dataSourceName); - DWORD tmp3; - - // This call doesn't return until the DTC has opened and closed a connection to the broker - // and written a recovery entry in its log. - hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast(native_dsn.c_str()), "qpidxarm.dll", &tmp3); - if (hr != S_OK) { - switch (hr) { - case E_FAIL: - throw gcnew QpidException("Resource Manager DLL configuration error"); - case E_INVALIDARG: - throw gcnew QpidException("Resource Manager internal error"); - case E_OUTOFMEMORY: - throw gcnew QpidException("Resource Manager out of memory"); - case E_UNEXPECTED: - throw gcnew QpidException("Resource Manager internal failure"); - case XACT_E_TMNOTAVAILABLE: - case XACT_E_CONNECTION_DOWN: - throw gcnew QpidException("MSDTC unavailable"); - - default: - throw gcnew QpidException("Resource Manager Registration failed"); - } - } - - rmCookie = tmp3; - } - finally { - if (rmCookie == 0) { - // undo partial construction - Cleanup(); - } - } -} - - -DtxResourceManager::!DtxResourceManager() { - Cleanup(); -} - - -DtxResourceManager::~DtxResourceManager() { - GC::SuppressFinalize(this); - Cleanup(); -} - - -// Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction -// called once for each outstanding tx - -void DtxResourceManager::TmDown() { - // this block is the only place where both locks are held - lock l1(transactionMap); - lock l2(resourceManagerMap); - if (tmDown) - return; - - tmDown = true; - resourceManagerMap->Remove(this->dataSourceName); - // defer cleanup until last TmDown notification received -} - - - -void DtxResourceManager::Cleanup() { - for each (Collections::Generic::KeyValuePair kvp in transactionMap) { - XaTransaction^ xaTr = kvp.Value; - xaTr->ChildFinalize(); - } - - try { - if (rmCookie != 0) { - // implies no recovery needed - bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0); - ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession); - rmCookie = 0; - } - - - if (xaHelperp != NULL) { - ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release(); - xaHelperp = NULL; - } - - if (dtcComp != NULL) { - ((IUnknown *) dtcComp)->Release(); - dtcComp = NULL; - } - - if (dtxControlSession != nullptr) { - dtxControlSession->Connection->Close(); - } - - } - catch (Exception^) {} -} - - -XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { - // find or create the RM instance associated with the session's broker - AmqpConnection^ connection = appSession->Connection; - DtxResourceManager^ instance = connection->CachedResourceManager; - - // try cached rm first - if (instance != nullptr) { - XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction); - if (xaTx != nullptr) - return xaTx; - else { - // cached version no longer available, force new rm creation - connection->CachedResourceManager = nullptr; - } - } - - lock l(resourceManagerMap); - String^ dsn = connection->DataSourceName; - if (!resourceManagerMap->TryGetValue(dsn, instance)) { - instance = gcnew DtxResourceManager(connection->Clone()); - resourceManagerMap->Add(dsn, instance); - connection->CachedResourceManager = instance; - } - l.release(); - - return instance->InternalGetXaTransaction(appSession, transaction); -} - - -XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { - // find or create the tx proxy instance associated with the DTC transaction - lock l(transactionMap); - if (tmDown) - return nullptr; - - XaTransaction^ xaTransaction = nullptr; - if (!transactionMap->TryGetValue(transaction, xaTransaction)) { - xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this); - transactionMap->Add(transaction, xaTransaction); - } - - return xaTransaction; -} - -void DtxResourceManager::Complete(Transaction ^tx) { - lock l(transactionMap); - transactionMap->Remove(tx); - - if (tmDown && (transactionMap->Count == 0)) { - // no more activity on this instance - GC::SuppressFinalize(this); - Cleanup(); - } -} - - -void DtxResourceManager::IncrementDoubt() { - Interlocked::Increment(doubtCount); -} - - -void DtxResourceManager::DecrementDoubt() { - Interlocked::Decrement(doubtCount); -} - - -#ifdef QPID_RECOVERY_TEST_HOOK -void DtxResourceManager::ForceRecovery(Transaction ^tx) { - lock l(resourceManagerMap); - for each (Collections::Generic::KeyValuePair kvp in resourceManagerMap) { - - Collections::Generic::Dictionary^ txmap = kvp.Value->transactionMap; - XaTransaction^ xaTransaction = nullptr; - lock l2(txmap); - if (txmap->TryGetValue(tx, xaTransaction)) { - xaTransaction->ForceRecovery(); - } - } -} -#endif - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h deleted file mode 100644 index 7df491eec2..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h +++ /dev/null @@ -1,76 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace System::Transactions; - -ref class XaTransaction; - -public ref class DtxResourceManager -{ -private: - // Receive() or WaitForMessage() - AmqpSession^ dtxControlSession; - String^ dataSourceName; - bool consumed; - DWORD rmCookie; - void* xaHelperp; - void* dtcComp; - int doubtCount; - DtxResourceManager(AmqpConnection^); - XaTransaction^ InternalGetXaTransaction (AmqpSession^ session, Transaction^ transaction); - bool tmDown; - - // The active transactions - Collections::Generic::Dictionary^ transactionMap; - - // one resource manager per AMQP broker per process - static Collections::Generic::Dictionary^ resourceManagerMap = - gcnew Collections::Generic::Dictionary(); - - void Cleanup(); - ~DtxResourceManager(); - !DtxResourceManager(); - -internal: - static XaTransaction^ GetXaTransaction (AmqpSession^ session, Transaction^ transaction); - void Complete(Transaction ^tx); - void TmDown(); - - property AmqpSession^ DtxControlSession { - AmqpSession^ get () { return dtxControlSession; } - } - - void IncrementDoubt(); - void DecrementDoubt(); - -#ifdef QPID_RECOVERY_TEST_HOOK -public: - static void ForceRecovery(Transaction ^tx); -#endif -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp deleted file mode 100644 index f8189df0dd..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ /dev/null @@ -1,868 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Demux.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" - -#include "MessageBodyStream.h" -#include "AmqpMessage.h" -#include "AmqpSession.h" -#include "InputLink.h" -#include "QpidMarshal.h" -#include "QpidException.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Threading; -using namespace msclr; - -using namespace qpid::client; -using namespace qpid::framing; - -using namespace std; - -using namespace Apache::Qpid::AmqpTypes; - -// Scalability note: When using async methods, an async helper thread is created -// to block on the Demux BlockingQueue. This design should be revised in line -// with proposed changes to the native library to reduce the number of servicing -// threads for large numbers of subscriptions. - -// synchronization is accomplished with locks, but also by ensuring that only one -// MessageWaiter (the one at the front of the line) is ever active. -// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch -// thread (who deposits FrameSets into the local queue and is oblivious to the -// managed space locks). - - -// The folowing def must match the "Frames" private typedef. -// TODO, make Qpid-cpp "Frames" definition visible. -typedef qpid::InlineVector FrameSetFrames; - -InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, - qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp, - bool exclusive, - bool temporary, System::String^ filterKey, System::String^ exchange) : - amqpSession(session), - subscriptionp(NULL), - localQueuep(NULL), - queuePtrp(NULL), - dequeuedFrameSetpp(NULL), - disposed(false), - finalizing(false) -{ - bool success = false; - System::Exception^ linkException = nullptr; - - waiters = gcnew Collections::Generic::List(); - linkLock = waiters; // private and available - subscriptionLock = gcnew Object(); - qpidAddress = QpidAddress::CreateAddress(sourceQueue, true); - qpidAddress->ResolveLink(session); - browsing = qpidAddress->Browsing; - - try { - std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName); - - if (temporary) { - qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); - qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange), - arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey)); - qpidSessionp->sync(); - } - - localQueuep = new LocalQueue; - SubscriptionSettings settings; - settings.flowControl = FlowControl::messageCredit(0); - settings.completionMode = CompletionMode::MANUAL_COMPLETION; - - if (browsing) { - settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED; - settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE; - } - else { - settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED; - settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT; - } - - Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); - subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup - - // the roundabout way to obtain localQueuep->queue - SessionBase_0_10Access sa(*qpidSessionp); - boost::shared_ptr simpl = sa.get(); - queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName())); - - success = true; - } finally { - if (!success) { - Cleanup(); - linkException = gcnew QpidException ("InputLink creation failure"); - throw linkException; - } - } -} - -// called with lock held -void InputLink::ReleaseNative() -{ - // involves talking to the Broker unless the connection is broken - - if ((subscriptionp != NULL) && !finalizing) { - // TODO: find boost time error on cleanup when in finalizer thread - try { - subscriptionp->cancel(); - } - catch (const std::exception& error) { - // TODO: log this properly - std::cout << "shutdown error " << error.what() << std::endl; - } - } - - // free native mem (or smart pointers) that we own - if (subscriptionp != NULL) { - delete subscriptionp; - subscriptionp = NULL; - } - if (queuePtrp != NULL) { - delete queuePtrp; - queuePtrp = NULL; - } - if (localQueuep != NULL) { - if (!finalizing) { - // TODO: find boost time error on cleanup when in finalizer thread - delete localQueuep; - localQueuep = NULL; - } - } - if (dequeuedFrameSetpp != NULL) { - delete dequeuedFrameSetpp; - dequeuedFrameSetpp = NULL; - } -} - -void InputLink::Cleanup() -{ - { - lock l(linkLock); - if (disposed) - return; - - disposed = true; - - // if the asyncHelper exists and is idle, unblock it - if (asyncHelperWaitHandle != nullptr) { - asyncHelperWaitHandle->Set(); - } - - // wakeup anyone waiting for messages - if (queuePtrp != NULL) - (*queuePtrp)->close(); - - // wait for any sync operations on the subscription to complete before ReleaseNative - lock l2(subscriptionLock); - - try {} - finally - { - ReleaseNative(); - } - } - - // Now that subscription is torn down, we can execute pending delete on remote node - qpidAddress->CleanupLink(amqpSession); - amqpSession->NotifyClosed(); -} - -InputLink::~InputLink() -{ - Cleanup(); -} - -InputLink::!InputLink() -{ - finalizing = true; - Cleanup(); -} - -void InputLink::Close() -{ - // Simulate Dispose()... - Cleanup(); - GC::SuppressFinalize(this); -} - -// call with lock held -bool InputLink::haveMessage() -{ - if (dequeuedFrameSetpp != NULL) - return true; - - if (queuePtrp != NULL) { - if ((*queuePtrp)->size() > 0) - return true; - } - return false; -} - -IntPtr InputLink::nextLocalMessage() -{ - lock l(linkLock); - - if (disposed) - return (IntPtr) NULL; - - // A message already pulled off BlockingQueue? - if (dequeuedFrameSetpp != NULL) { - QpidFrameSetPtr* rv = dequeuedFrameSetpp; - dequeuedFrameSetpp = NULL; - return (IntPtr) rv; - } - - if ((*queuePtrp)->empty()) - return (IntPtr) NULL; - - bool received = false; - QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr; - - try { - received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE); - if (received) { - QpidFrameSetPtr* rv = frameSetpp; - // no need to free native in finally block - frameSetpp = NULL; - return (IntPtr) rv; - } - } catch(const std::exception& error) { - // should be no async tampering with queue since we hold the lock and have a - // smart pointer ref to the native LocalQueue, even if the network connection fails... - cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <close(); -} - - - -// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks -// a blocking thread without interefering with queue contents or the ability to push -// new incoming messages. - -void InputLink::resetQueue() -{ - lock l(linkLock); - if (disposed) - return; - if ((*queuePtrp)->isClosed()) { - (*queuePtrp)->open(); - } -} - - -// returns true if there is a message to consume, i.e. nextLocalMessage() won't block - -bool InputLink::internalWaitForMessage() -{ - Demux::QueuePtr demuxQueuePtr; - - bool received = false; - QpidFrameSetPtr* frameSetpp = NULL; - try { - lock l(linkLock); - if (disposed) - return false; - if (haveMessage()) - return true; - - AdjustCredit(); - - // get a scoped smart ptr ref to guard against async close or hangup - demuxQueuePtr = *queuePtrp; - frameSetpp = new QpidFrameSetPtr; - - l.release(); - // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired. - received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE); - l.acquire(); - - if (received) { - dequeuedFrameSetpp = frameSetpp; - frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream - } - - return true; - } catch(const std::exception& ) { - // timeout or connection closed - return false; - } - finally { - if (frameSetpp != NULL) { - delete frameSetpp; - } - } - - return false; -} - - -// call with lock held -void InputLink::addWaiter(MessageWaiter^ waiter) -{ - waiters->Add(waiter); - if (waiters->Count == 1) { - // mark this waiter as ready to run - // Only the waiter at the head of the queue is active. - waiter->Activate(); - } - - if (waiter->Assigned) - return; - - if (asyncHelperWaitHandle == nullptr) { - asyncHelperWaitHandle = gcnew ManualResetEvent(false); - ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper); - (gcnew Thread(threadDelegate))->Start(); - } - - if (waiters->Count == 1) { - // wake up the asyncHelper - asyncHelperWaitHandle->Set(); - } -} - - -void InputLink::removeWaiter(MessageWaiter^ waiter) { - // a waiter can be removed from anywhere in the list if timed out - - lock l(linkLock); - int idx = waiters->IndexOf(waiter); - if (idx == -1) { - // TODO: assert or log - if (asyncHelperWaitHandle != nullptr) { - // just in case. - asyncHelperWaitHandle->Set(); - } - return; - } - - waiters->RemoveAt(idx); - if (waiter->TimedOut) { - // may have to give back message if it arrives momentarily - AdjustCredit(); - } - - // let the next waiter know it's his turn. - if (waiters->Count > 0) { - MessageWaiter^ nextWaiter = waiters[0]; - - // wakeup the asyncHelper thread to help out if necessary. - if (!nextWaiter->Assigned) { - asyncHelperWaitHandle->Set(); - } - - l.release(); - nextWaiter->Activate(); - return; - } - else { - if (disposed && (asyncHelperWaitHandle != nullptr)) { - asyncHelperWaitHandle->Set(); - } - } -} - - -void InputLink::asyncHelper() -{ - lock l(linkLock); - - while (true) { - if (disposed && (waiters->Count == 0)) { - asyncHelperWaitHandle = nullptr; - return; - } - - if (waiters->Count > 0) { - MessageWaiter^ waiter = waiters[0]; - - l.release(); - if (waiter->AcceptForWork()) { - waiter->Run(); - } - l.acquire(); - } - - // sleep if more work may be coming or it is currently someone else's turn - if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) { - // wait for something to do - asyncHelperWaitHandle->Reset(); - l.release(); - asyncHelperWaitHandle->WaitOne(); - l.acquire(); - } - } -} - -void InputLink::sync() -{ - // used by the MessageWaiter timeout thread to not run before fully initialized - lock l(linkLock); -} - - -void InputLink::PrefetchLimit::set(int value) -{ - lock l(linkLock); - prefetchLimit = value; - - int delta = 0; - - // rough rule of thumb to keep the flow, but reduce chatter. - // for small messages, the credit request is almost as expensive as the transfer itself. - // experience may suggest a better heuristic or require a property for the low water mark - if (prefetchLimit >= 3) { - delta = prefetchLimit / 3; - } - minWorkingCredit = prefetchLimit - delta; - AdjustCredit(); -} - - -// call with lock held -void InputLink::AdjustCredit() -{ - if (creditSyncPending || disposed) - return; - - // low watermark check - if ((prefetchLimit != 0) && - (workingCredit >= minWorkingCredit) && - (workingCredit >= waiters->Count)) - return; - - // should have enough for all waiters or to satisfy the prefetch window - int targetCredit = waiters->Count; - if (targetCredit < prefetchLimit) - targetCredit = prefetchLimit; - - if (targetCredit > workingCredit) { - subscriptionp->grantMessageCredit(targetCredit - workingCredit); - workingCredit = targetCredit; - return; - } - if (targetCredit < workingCredit) { - if ((targetCredit == 0) && (prefetchLimit == 0)) { - creditSyncPending = true; - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); - } - // TODO: also shrink credit when prefetchLimit != 0 - } -} - -void InputLink::SyncCredit(Object ^unused) -{ - lock l(linkLock); - - try { - if (disposed) - return; - - if (!amqpSession->MessageStop(subscriptionp->getName())) { - // connection closed - return; - } - - l.release(); - // use setFlowControl to re-enable credit flow on the broker. - // setFlowControl is a sync operation - { - lock l2(subscriptionLock); - if (subscriptionp != NULL) { - subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl); - } - } - l.acquire(); - - if (disposed) - return; - - // let existing waiters use up any messages that arrived. - // local queue size can only decrease until more credit is issued - while (true) { - if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { - l.release(); - // a rare use case and not used in performance oriented code. - // optimization can wait until the qpid/messaging api is used - Thread::Sleep(10); - l.acquire(); - if (disposed) - return; - } - else { - break; - } - } - - // At this point, the lock is held and we are fully synced with the broker - // so we have a valid snapshot - - if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { - // can't be sure application will request a message again any time soon - QpidFrameSetPtr frameSetp; - while (!(*queuePtrp)->empty()) { - (*queuePtrp)->pop(frameSetp); - SequenceSet frameSetID(frameSetp->getId()); - subscriptionp->release(frameSetID); - } - - // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a - // MessageWaiter about to to get the nextLocalMessage(), or implicitely - // from a WaitForMessage(). - } - // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit - - workingCredit = (*queuePtrp)->size(); - if (dequeuedFrameSetpp != NULL) { - workingCredit++; - } - } - finally { - creditSyncPending = false; - } - - AdjustCredit(); -} - - -AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) -{ - QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); - bool ownFrameSet = true; - bool haveProperties = false; - - try { - MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp); - ownFrameSet = false; // stream releases on close/dispose - - AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream); - - AMQHeaderBody* headerBodyp = (*fspp)->getHeaders(); - uint64_t contentSize = (*fspp)->getContentSize(); - SequenceSet frameSetID((*fspp)->getId()); - - // target managed representation - AmqpProperties^ amqpProperties = gcnew AmqpProperties(); - - // source native representation - const DeliveryProperties* deliveryProperties = headerBodyp->get(); - const qpid::framing::MessageProperties* messageProperties = headerBodyp->get(); - - if (deliveryProperties) { - if (deliveryProperties->hasRoutingKey()) { - haveProperties = true; - - amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str()); - } - - if (deliveryProperties->hasDeliveryMode()) { - if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT) - amqpProperties->Durable = true; - } - - if (deliveryProperties->hasTtl()) { - long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond; - amqpProperties->TimeToLive = Nullable(TimeSpan::FromTicks(ticks)); - } - } - - if (messageProperties) { - - if (messageProperties->hasReplyTo()) { - haveProperties = true; - const ReplyTo& rpto = messageProperties->getReplyTo(); - String^ rk = nullptr; - String^ ex = nullptr; - if (rpto.hasRoutingKey()) { - rk = gcnew String(rpto.getRoutingKey().c_str()); - } - if (rpto.hasExchange()) { - ex = gcnew String(rpto.getExchange().c_str()); - } - amqpProperties->SetReplyTo(ex,rk); - } - - if (messageProperties->hasContentType()) { - haveProperties = true; - amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str()); - - if (messageProperties->hasContentEncoding()) { - String^ enc = gcnew String(messageProperties->getContentEncoding().c_str()); - if (!String::IsNullOrEmpty(enc)) { - // TODO: properly assemble 1.0 style to 0-10 for all cases - amqpProperties->ContentType += "; charset=" + enc; - } - } - } - - if (messageProperties->hasCorrelationId()) { - haveProperties = true; - const std::string& ncid = messageProperties->getCorrelationId(); - int len = ncid.size(); - array^ mcid = gcnew array(len); - Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len); - amqpProperties->CorrelationId = mcid; - } - - if (messageProperties->hasUserId()) { - haveProperties = true; - const std::string& nuid = messageProperties->getUserId(); - int len = nuid.size(); - array^ muid = gcnew array(len); - Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len); - amqpProperties->UserId = muid; - } - - if (messageProperties->hasApplicationHeaders()) { - haveProperties = true; - const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders(); - int count = fieldTable.count(); - - if (count > 0) { - haveProperties = true; - Collections::Generic::Dictionary^ mmap = - gcnew Collections::Generic::Dictionary(count); - - for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) { - - qpid::framing::FieldValue::Data &data = i->second->getData(); - - // TODO: replace these generic int/string conversions with handler for each AMQP specific type: - // uint8_t dataType = i->second->getType(); - // switch (dataType) { case TYPE_CODE_STR8: ... } - - if (data.convertsToInt()) { - mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt())); - } - if (data.convertsToString()) { - std::string ns = data.getString(); - String^ ms = gcnew String(ns.data(), 0, ns.size()); - mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms)); - } - } - - amqpProperties->PropertyMap = mmap; - } - - } - } - - if (haveProperties) { - amqpMessage->Properties = amqpProperties; - } - - // We have a message we can return to the caller. - // Tell the broker we got it. - - // subscriptionp->accept(frameSetID) is a slow sync operation in the native API - // so do it within the AsyncSession directly - amqpSession->AcceptAndComplete(frameSetID, browsing); - - workingCredit--; - // check if more messages need to be requested from broker - AdjustCredit(); - - return amqpMessage; - } - finally { - if (ownFrameSet) - delete (fspp); - } -} - - // As for IInputChannel: - // if success, return true + amqpMessage - // elseif timeout, return false - // elseif closed/EOF, return true and amqpMessage = null - // else throw an Exception - -bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) -{ - lock l(linkLock); - - if (waiters->Count == 0) { - // see if there is a message already available without blocking - IntPtr fspp = nextLocalMessage(); - if (fspp.ToPointer() != NULL) { - amqpMessage = createAmqpMessage(fspp); - return true; - } - } - - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr); - addWaiter(waiter); - - l.release(); - waiter->Run(); - l.acquire(); - - if (waiter->TimedOut) { - return false; - } - - IntPtr waiterMsg = waiter->Message; - if (waiterMsg.ToPointer() == NULL) { - if (disposed) { - // indicate normal EOF on channel - amqpMessage = nullptr; - return true; - } - } - - amqpMessage = createAmqpMessage(waiterMsg); - return true; -} - -IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state) -{ - - //TODO: if haveMessage() complete synchronously - - lock l(linkLock); - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); - addWaiter(waiter); - return waiter; -} - -bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage) -{ - - // TODO: validate result - - MessageWaiter^ waiter = (MessageWaiter ^) result; - - waiter->WaitForCompletion(); - - if (waiter->RunException != nullptr) - throw waiter->RunException; - - if (waiter->TimedOut) { - amqpMessage = nullptr; - return false; - } - - IntPtr waiterMsg = waiter->Message; - if (waiterMsg.ToPointer() == NULL) { - if (disposed) { - // indicate normal EOF on channel - amqpMessage = nullptr; - return true; - } - } - - amqpMessage = createAmqpMessage(waiterMsg); - return true; -} - - -bool InputLink::WaitForMessage(TimeSpan timeout) -{ - lock l(linkLock); - - if (disposed) - return false; - - if (waiters->Count == 0) { - // see if there is a message already available without blocking - if (haveMessage()) - return true; - } - - // Same as for TryReceive, except consuming = false - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr); - addWaiter(waiter); - - l.release(); - waiter->Run(); - l.acquire(); - - if (waiter->TimedOut) { - return false; - } - - return haveMessage(); -} - -IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) -{ - lock l(linkLock); - - // Same as for BeginTryReceive, except consuming = false - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); - addWaiter(waiter); - return waiter; -} - -bool InputLink::EndWaitForMessage(IAsyncResult^ result) -{ - MessageWaiter^ waiter = (MessageWaiter ^) result; - - waiter->WaitForCompletion(); - - if (waiter->TimedOut) { - return false; - } - - return haveMessage(); -} - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h deleted file mode 100644 index 136d53d280..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h +++ /dev/null @@ -1,110 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -#include "MessageWaiter.h" -#include "QpidAddress.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace System::Runtime::InteropServices; - -using namespace qpid::client; -using namespace std; - -// smart pointer to the low level AMQP 0-10 frames of the message -typedef qpid::framing::FrameSet::shared_ptr QpidFrameSetPtr; - -public ref class InputLink -{ -private: - AmqpSession^ amqpSession; - Subscription* subscriptionp; - LocalQueue* localQueuep; - Demux::QueuePtr* queuePtrp; - Collections::Generic::List^ waiters; - bool disposed; - bool finalizing; - Object^ linkLock; - Object^ subscriptionLock; - QpidFrameSetPtr* dequeuedFrameSetpp; - ManualResetEvent^ asyncHelperWaitHandle; - // number of messages to buffer locally for future consumption - int prefetchLimit; - // the number of messages requested and not yet processed - int workingCredit; - // stopping and restarting the message flow - bool creditSyncPending; - // working credit low water mark - int minWorkingCredit; - - bool browsing; - QpidAddress^ qpidAddress; - - void Cleanup(); - void ReleaseNative(); - bool haveMessage(); - void addWaiter(MessageWaiter^ waiter); - void asyncHelper(); - AmqpMessage^ createAmqpMessage(IntPtr msgp); - void AdjustCredit(); - void SyncCredit(Object ^); - -internal: - InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp, - qpid::client::SubscriptionManager *qpidSubsMgrp, bool exclusive, bool temporary, System::String^ filterKey, - System::String^ exchange); - - bool internalWaitForMessage(); - void unblockWaiter(); - void resetQueue(); - IntPtr nextLocalMessage(); - void removeWaiter(MessageWaiter^ waiter); - void sync(); - -public: - ~InputLink(); - !InputLink(); - void Close(); - - bool TryReceive(TimeSpan timeout, [Out] AmqpMessage ^% amqpMessage); - IAsyncResult^ BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state); - bool EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage); - - bool WaitForMessage(TimeSpan timeout); - IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state); - bool EndWaitForMessage(IAsyncResult^ result); - - property int PrefetchLimit { - int get () { return prefetchLimit; } - void set (int value); - } - - property bool Browsing { - bool get () { return browsing; } - } - -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj deleted file mode 100644 index fe288cbe76..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj +++ /dev/null @@ -1,501 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp deleted file mode 100644 index f2cb5740d3..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp +++ /dev/null @@ -1,337 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/AMQFrame.h" - -#include "MessageBodyStream.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Threading; -using namespace msclr; - -using namespace qpid::client; -using namespace qpid::framing; - -// Thefolowing def must match "Frames" private typedef. -// TODO: make "Frames" publicly visible. -typedef qpid::InlineVector FrameSetFrames; - -using namespace std; - -static void ThrowIfBadArgs (array^ buffer, int offset, int count) -{ - if (buffer == nullptr) - throw gcnew ArgumentNullException("buffer"); - - if (offset < 0) - throw gcnew ArgumentOutOfRangeException("offset"); - - if (count < 0) - throw gcnew ArgumentOutOfRangeException("count"); - - if ((offset + count) > buffer->Length) - throw gcnew ArgumentException("offset + count"); -} - - -// Input stream constructor - -MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp) -{ - isInputStream = true; - frameSetpp = fspp; - fragmentCount = 0; - length = 0; - position = 0; - currentFramep = NULL; - - const std::string *datap; // pointer to the fragment's string variable that holds the content - - for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { - if (i->getBody()->type() == CONTENT_BODY) { - fragmentCount++; - datap = &(i->castBody()->getData()); - length += datap->size(); - } - } - - // fragmentCount can be zero for an empty message - - fragmentIndex = 0; - fragmentPosition = 0; - - if (fragmentCount == 0) { - currentFragment = NULL; - fragmentLength = 0; - } - else if (fragmentCount == 1) { - currentFragment = datap->data(); - fragmentLength = (int) length; - } - else { - fragments = gcnew array(fragmentCount); - fragmentIndex = 0; - for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { - if (i->getBody()->type() == CONTENT_BODY) { - datap = &(i->castBody()->getData()); - fragments[fragmentIndex++] = (IntPtr) (void *) datap; - } - } - fragmentIndex = 0; - datap = (const std::string *) fragments[0].ToPointer(); - currentFragment = datap->data(); - fragmentLength = datap->size(); - } -} - - -int MessageBodyStream::Read(array^ buffer, int offset, int count) -{ - if (!isInputStream) - throw gcnew NotSupportedException(); - if (disposed) - throw gcnew ObjectDisposedException("Stream"); - if (count == 0) - return 0; - ThrowIfBadArgs(buffer, offset, count); - - int nRead = 0; - int remaining = count; - - while (nRead < count) { - int fragAvail = fragmentLength - fragmentPosition; - int copyCount = min (fragAvail, remaining); - if (copyCount == 0) { - // no more to read - return nRead; - } - - // copy from native space - IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition); - Marshal::Copy (nativep, buffer, offset, copyCount); - nRead += copyCount; - remaining -= copyCount; - fragmentPosition += copyCount; - offset += copyCount; - - // advance to next fragment? - if (fragmentPosition == fragmentLength) { - if (++fragmentIndex < fragmentCount) { - const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer(); - currentFragment = datap->data(); - fragmentLength = datap->size(); - fragmentPosition = 0; - } - } - } - - return nRead; -} - - -void MessageBodyStream::pushCurrentFrame(bool lastFrame) -{ - // set flags as in SessionImpl::sendContent. - if (currentFramep->getBody()->type() == CONTENT_BODY) { - - if ((fragmentCount == 1) && lastFrame) { - // only one content frame - currentFramep->setFirstSegment(false); - } - else { - currentFramep->setFirstSegment(false); - currentFramep->setLastSegment(true); - if (fragmentCount != 1) { - currentFramep->setFirstFrame(false); - } - if (!lastFrame) { - currentFramep->setLastFrame(false); - } - } - } - else { - // the header frame - currentFramep->setFirstSegment(false); - if (!lastFrame) { - // there will be at least one content frame - currentFramep->setLastSegment(false); - } - } - - // add to frame set. This makes a copy and ref counts the body - (*frameSetpp)->append(*currentFramep); - - delete currentFramep; - - currentFramep = NULL; -} - - -IntPtr MessageBodyStream::GetFrameSet() -{ - if (currentFramep != NULL) { - // No more content. Tidy up the pending (possibly single header) frame. - pushCurrentFrame(true); - } - - if (frameSetpp == NULL) { - return (IntPtr) NULL; - } - - // shared_ptr.get() - return (IntPtr) (void *) (*frameSetpp).get(); -} - -IntPtr MessageBodyStream::GetHeader() -{ - return (IntPtr) headerBodyp; -} - - -// Ouput stream constructor - -MessageBodyStream::MessageBodyStream(int maxFrameSize) -{ - isInputStream = false; - - maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead(); - SequenceNumber unused; // only meaningful on incoming frames - frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused)); - fragmentCount = 0; - length = 0; - position = 0; - - // header goes first in the outgoing frameset - - boost::intrusive_ptr headerBody(new AMQHeaderBody); - currentFramep = new AMQFrame(headerBody); - headerBodyp = static_cast(headerBody.get()); - - // mark this header frame as "full" to force the first write to create a new content frame - fragmentPosition = maxFrameContentSize; -} - -void MessageBodyStream::Write(array^ buffer, int offset, int count) -{ - if (isInputStream) - throw gcnew NotSupportedException(); - if (disposed) - throw gcnew ObjectDisposedException("Stream"); - if (count == 0) - return; - ThrowIfBadArgs(buffer, offset, count); - - if (currentFramep == NULL) { - // GetFrameSet() has been called and we no longer exclusively own the underlying frames. - throw gcnew InvalidOperationException ("Mesage Body output already completed"); - } - - if (count <= 0) - return; - - // keep GC memory movement at bay while copying to native space - pin_ptr pinnedBuf = &buffer[0]; - - string *datap; - - int remaining = count; - while (remaining > 0) { - if (fragmentPosition == maxFrameContentSize) { - // move to a new frame, but not until ready to add new content. - // zero content is valid, or the final write may exactly fill to maxFrameContentSize - - pushCurrentFrame(false); - - currentFramep = new AMQFrame(AMQContentBody()); - fragmentPosition = 0; - fragmentCount++; - } - - int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition)); - datap = &(currentFramep->castBody()->getData()); - - char *outp = (char *) pinnedBuf + offset; - if (fragmentPosition == 0) { - datap->assign(outp, copyCount); - } - else { - datap->append(outp, copyCount); - } - - position += copyCount; - fragmentPosition += copyCount; - remaining -= copyCount; - offset += copyCount; - } -} - - -void MessageBodyStream::Cleanup() -{ - { - lock l(this); - if (disposed) - return; - - disposed = true; - } - - try {} - finally - { - if (frameSetpp != NULL) { - delete frameSetpp; - frameSetpp = NULL; - } - if (currentFramep != NULL) { - delete currentFramep; - currentFramep = NULL; - } - } -} - -MessageBodyStream::~MessageBodyStream() -{ - Cleanup(); -} - -MessageBodyStream::!MessageBodyStream() -{ - Cleanup(); -} - -void MessageBodyStream::Close() -{ - // Simulate Dispose()... - Cleanup(); - GC::SuppressFinalize(this); -} - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h deleted file mode 100644 index fa8e3f6bde..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h +++ /dev/null @@ -1,131 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; - -using namespace qpid::client; -using namespace qpid::framing; -using namespace std; - - -// This class provides memory streaming of the message body contents -// between native and managed space. To avoid additional memory copies -// in native space, it reads and writes directly to the low level Qpid -// frames. - -public ref class MessageBodyStream : System::IO::Stream -{ -private: - bool isInputStream; - long long length; - long long position; - - // the boost smart pointer that keeps the message body frames in memory - FrameSet::shared_ptr *frameSetpp; - - int fragmentCount; - int fragmentIndex; - const char* currentFragment; - int fragmentPosition; - int fragmentLength; - array^ fragments; - - int maxFrameContentSize; - AMQFrame* currentFramep; - void* headerBodyp; - bool disposed; - bool finalizing; - void Cleanup(); - -internal: - // incoming message - MessageBodyStream(FrameSet::shared_ptr *fspp); - // outgoing message - MessageBodyStream(int maxFrameSize); - void pushCurrentFrame(bool last); -public: - ~MessageBodyStream(); - !MessageBodyStream(); - virtual void Close() override; - virtual int Read( - [InAttribute] [OutAttribute] array^ buffer, - int offset, - int count) override; - - virtual void Write( - array^ buffer, - int offset, - int count) override; - - - IntPtr GetFrameSet(); - IntPtr GetHeader(); - - virtual void Flush() override {} // noop - - - // TODO: see CanSeek below. - virtual long long Seek( - long long offset, - System::IO::SeekOrigin origin) override {throw gcnew System::NotSupportedException(); } - - // TODO: see CanSeek below. - virtual void SetLength( - long long value) override {throw gcnew System::NotSupportedException(); } - - virtual property long long Length { - long long get() override { return length; } - }; - - virtual property long long Position { - long long get() override { return position; } - void set(long long p) override { throw gcnew System::NotSupportedException(); } - }; - - - virtual property bool CanRead { - bool get () override { return isInputStream; } - } - - virtual property bool CanWrite { - bool get () override { return !isInputStream; } - } - - // Note: this class must return true to signal that the Length property works. - // Required by the raw message encoder. - // "If a class derived from Stream does not support seeking, calls to Length, - // SetLength, Position, and Seek throw a NotSupportedException". - - virtual property bool CanSeek { - bool get () override { return true; } - } - - virtual property bool CanTimeout { - bool get () override { return isInputStream; } - } -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp deleted file mode 100644 index f7a28b0692..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Demux.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" - -#include "MessageBodyStream.h" -#include "AmqpMessage.h" -#include "AmqpSession.h" -#include "InputLink.h" -#include "MessageWaiter.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace msclr; - - -MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state) -{ - this->consuming = consuming; - if (!consuming) { - GC::SuppressFinalize(this); - } - - if (async) { - this->async = true; - this->asyncCallback = callback; - this->state = state; - } - else { - this->assigned = true; - } - this->parent = parent; - this->thisLock = gcnew Object(); - - // do this after the Message Waiter is fully initialized, in case of - // very small timespan - if (timeSpan != TimeSpan::MaxValue) { - this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); - } -} - -MessageWaiter::~MessageWaiter() -{ - if (message != IntPtr::Zero) { - try{} - finally { - delete message.ToPointer(); - message = IntPtr::Zero; - } - } -} - -MessageWaiter::!MessageWaiter() -{ - this->~MessageWaiter(); -} - - -void MessageWaiter::WaitForCompletion() -{ - if (isCompleted) - return; - - lock l(thisLock); - while (!isCompleted) { - Monitor::Wait(thisLock); - } -} - -void MessageWaiter::Activate() -{ - if (activated) - return; - - lock l(thisLock); - if (!activated) { - activated = true; - Monitor::PulseAll(thisLock); - } -} - - -void MessageWaiter::Run() -{ - lock l(thisLock); - - // wait until Activate(), i.e. our turn in the waiter list or a timeout - while (!activated) { - Monitor::Wait(thisLock); - } - bool haveMessage = false; - bool mustReset = false; - - if (!timedOut) - blocking = true; - - if (blocking) { - l.release(); - - try { - haveMessage = parent->internalWaitForMessage(); - } - catch (System::Exception^ e) { - runException = e; - } - - l.acquire(); - blocking = false; - if (timedOut) { - // TimeoutCallback() called parent->unblockWaiter() - mustReset = true; - // let the timer thread move past critical region - while (processingTimeout) { - Monitor::Wait(thisLock); - } - } - } - - if (timer != nullptr) { - timer->~Timer(); - timer = nullptr; - } - - if (haveMessage) { - timedOut = false; // for the case timeout and message arrival are essentially tied - if (!consuming) { - // just waiting - haveMessage = false; - } - } - - if (haveMessage || mustReset) { - l.release(); - if (haveMessage) { - // hang on to it for when the async caller gets around to retrieving - message = parent->nextLocalMessage(); - } - if (mustReset) { - parent->resetQueue(); - } - l.acquire(); - } - - isCompleted = true; - Monitor::PulseAll(thisLock); - - // do this check and signal while locked - if (asyncWaitHandle != nullptr) - asyncWaitHandle->Set(); - - l.release(); - parent->removeWaiter(this); - - - if (asyncCallback != nullptr) { - // guard against application callback exception - try { - asyncCallback(this); - } - catch (System::Exception^) { - // log it? - } - } - -} - -bool MessageWaiter::AcceptForWork() -{ - lock l(thisLock); - if (!assigned) { - assigned = true; - return true; - } - return false; -} - -void MessageWaiter::TimeoutCallback(Object^ state) -{ - MessageWaiter^ waiter = (MessageWaiter^) state; - if (waiter->isCompleted) - return; - - // make sure parent has finished initializing us before we get going - waiter->parent->sync(); - - lock l(waiter->thisLock); - if (waiter->timer == nullptr) { - // the waiter is in the clean up phase and doesn't need a wakeup - return; - } - - // timedOut, blocking and processingTimeout work as a unit - waiter->timedOut = true; - if (waiter->blocking) { - // let the waiter know that we are busy with an upcoming unblock operation - waiter->processingTimeout = true; - } - - waiter->Activate(); - - if (waiter->processingTimeout) { - // call with lock off - l.release(); - waiter->parent->unblockWaiter(); - - // synchronize with blocked thread - l.acquire(); - waiter->processingTimeout = false; - Monitor::PulseAll(waiter->thisLock); - } - - l.release(); - - // If waiter has no associated thread, we must move it to completion - if (waiter->AcceptForWork()) { - waiter->Run(); // does not block since timedOut == true - } -} - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h deleted file mode 100644 index 3737430844..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; - -public ref class MessageWaiter : IAsyncResult -{ -private: - // Receive() or WaitForMessage() - bool consuming; - bool consumed; - bool timedOut; - bool async; - // has an owner thread - bool assigned; - // can Run (i.e. earlier MessageWaiters in the queue have completed) - bool activated; - // is making a call to internalWaitForMessage() which (usually) blocks - bool blocking; - // the timeout timer thread is lurking - bool processingTimeout; - // the saved exception from within Run() for async delivery - System::Exception^ runException; - AsyncCallback^ asyncCallback; - Threading::Timer ^timer; - bool isCompleted; - bool completedSynchronously; - Object^ state; - Object^ thisLock; - ManualResetEvent^ asyncWaitHandle; - InputLink^ parent; - static void TimeoutCallback(Object^ state); - static TimerCallback^ timeoutCallback = gcnew TimerCallback(MessageWaiter::TimeoutCallback); - IntPtr message; - !MessageWaiter(); - ~MessageWaiter(); - - internal: - MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state); - - void Run(); - bool AcceptForWork(); - void Activate(); - void WaitForCompletion(); - - - property IntPtr Message { - IntPtr get () { - if (!consuming || consumed) - throw gcnew InvalidOperationException("Message property"); - consumed = true; - IntPtr v = message; - message = IntPtr::Zero; - GC::SuppressFinalize(this); - return v; - } - } - - property bool Assigned { - bool get () { return assigned; } - } - - property bool TimedOut { - bool get () { return timedOut; } - } - - property System::Exception^ RunException { - System::Exception^ get() { return runException; } - } - - - public: - - virtual property bool IsCompleted { - bool get () { return isCompleted; } - } - - virtual property bool CompletedSynchronously { - bool get () { return completedSynchronously; } - } - - virtual property WaitHandle^ AsyncWaitHandle { - WaitHandle^ get () { - if (asyncWaitHandle != nullptr) { - return asyncWaitHandle; - } - - msclr::lock l(thisLock); - if (asyncWaitHandle == nullptr) { - asyncWaitHandle = gcnew ManualResetEvent(isCompleted); - } - return asyncWaitHandle; - } - } - - virtual property Object^ AsyncState { - Object^ get () { return state; } - } -}; - -}}} // namespace Apache::Qpid::Interop - diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp deleted file mode 100644 index de7141dadb..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp +++ /dev/null @@ -1,255 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" - - -#include "AmqpSession.h" -#include "AmqpMessage.h" -#include "OutputLink.h" -#include "QpidMarshal.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Threading; -using namespace msclr; - -using namespace qpid::client; -using namespace std; - -using namespace Apache::Qpid::AmqpTypes; - - -OutputLink::OutputLink(AmqpSession^ session, String^ address) : - amqpSession(session), - disposed(false), - maxFrameSize(session->Connection->MaxFrameSize), - finalizing(false) -{ - qpidAddress = QpidAddress::CreateAddress(address, false); - qpidAddress->ResolveLink(session); -} - -void OutputLink::Cleanup() -{ - { - lock l(this); - if (disposed) - return; - - disposed = true; - } - - // process any pending queue delete - qpidAddress->CleanupLink(amqpSession); - amqpSession->NotifyClosed(); -} - -OutputLink::~OutputLink() -{ - Cleanup(); -} - -OutputLink::!OutputLink() -{ - Cleanup(); -} - -void OutputLink::Close() -{ - // Simulate Dispose()... - Cleanup(); - GC::SuppressFinalize(this); -} - - -AmqpMessage^ OutputLink::CreateMessage() -{ - MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize); - AmqpMessage ^amqpm = gcnew AmqpMessage(mbody); - return amqpm; -} - - -void OutputLink::ManagedToNative(AmqpMessage^ m) -{ - MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream; - - AmqpProperties^ mprops = m->Properties; - - if (mprops != nullptr) { - AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer(); - - if (mprops->HasDeliveryProperties) { - DeliveryProperties* deliveryPropertiesp = bodyp->get(true); - - if (mprops->RoutingKey != nullptr) { - deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey)); - } - - if (mprops->Durable) { - deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT); - } - - if (mprops->TimeToLive.HasValue) { - long long ttl = mprops->TimeToLive.Value.Ticks; - bool was_positive = (ttl > 0); - if (ttl < 0) - ttl = 0; - ttl = ttl / TimeSpan::TicksPerMillisecond; - if ((ttl == 0) && was_positive) - ttl = 1; - deliveryPropertiesp->setTtl(ttl); - } - } - - if (mprops->HasMessageProperties) { - qpid::framing::MessageProperties* messagePropertiesp = - bodyp->get(true); - - String^ replyToExchange = mprops->ReplyToExchange; - String^ replyToRoutingKey = mprops->ReplyToRoutingKey; - if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) { - qpid::framing::ReplyTo nReplyTo; - if (replyToExchange != nullptr) { - nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange)); - } - if (replyToRoutingKey != nullptr) { - nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey)); - } - messagePropertiesp->setReplyTo(nReplyTo); - } - - // TODO: properly split 1.0 style to 0-10 content type + encoding - - String^ contentType = mprops->ContentType; - if (contentType != nullptr) { - String^ type = nullptr; - String^ enc = nullptr; - int idx = contentType->IndexOf(';'); - if (idx == -1) { - type = contentType; - } - else { - type = contentType->Substring(0, idx); - contentType = contentType->Substring(idx + 1); - idx = contentType->IndexOf('='); - if (idx != -1) { - enc = contentType->Substring(idx + 1); - enc = enc->Trim(); - } - } - if (!String::IsNullOrEmpty(type)) { - messagePropertiesp->setContentType(QpidMarshal::ToNative(type)); - } - if (!String::IsNullOrEmpty(enc)) { - messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc)); - } - } - - - array^ mbytes = mprops->CorrelationId; - if (mbytes != nullptr) { - pin_ptr pinnedBuf = &mbytes[0]; - std::string s((char *) pinnedBuf, mbytes->Length); - messagePropertiesp->setCorrelationId(s); - } - - mbytes = mprops->UserId; - if (mbytes != nullptr) { - pin_ptr pinnedBuf = &mbytes[0]; - std::string s((char *) pinnedBuf, mbytes->Length); - messagePropertiesp->setUserId(s); - } - - if (mprops->HasMappedProperties) { - qpid::framing::FieldTable fieldTable; - // TODO: add support for abitrary AMQP types - for each (Collections::Generic::KeyValuePair kvp in mprops->PropertyMap) { - Type^ type = kvp.Value->GetType(); - if (type == AmqpInt::typeid) { - fieldTable.setInt(QpidMarshal::ToNative(kvp.Key), - ((AmqpInt ^) kvp.Value)->Value); - } - else if (type == AmqpString::typeid) { - AmqpString^ str = (AmqpString ^) kvp.Value; - // For now, FieldTable supports a single string type - fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value)); - } - } - - messagePropertiesp->setApplicationHeaders(fieldTable); - } - } - } -} - - - -void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout) -{ - // copy properties from managed space to the native counterparts - ManagedToNative(amqpMessage); - - MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; - CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, - timeout, false, nullptr, nullptr); - - if (waiter != nullptr) { - waiter->WaitForCompletion(); - if (waiter->TimedOut) { - throw gcnew TimeoutException("Receive"); - } - } - // else: SendMessage() has already waited for the Completion - -} - -IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state) -{ - ManagedToNative(amqpMessage); - - MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; - CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state); - return waiter; -} - -void OutputLink::EndSend(IAsyncResult^ result) -{ - CompletionWaiter^ waiter = (CompletionWaiter ^) result; - waiter->WaitForCompletion(); - if (waiter->TimedOut) { - throw gcnew TimeoutException("Receive"); - } -} - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h deleted file mode 100644 index e30d1cc79f..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h +++ /dev/null @@ -1,75 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -#include "QpidAddress.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; - -using namespace qpid::client; -using namespace std; - - -public ref class OutputLink -{ -private: - AmqpSession^ amqpSession; - QpidAddress^ qpidAddress; - bool disposed; - bool finalizing; - void Cleanup(); - AmqpTypes::AmqpProperties^ defaultProperties; - void ManagedToNative(AmqpMessage^ m); - int maxFrameSize; - -internal: - OutputLink(AmqpSession^ session, String^ defaultQueue); - -public: - ~OutputLink(); - !OutputLink(); - void Close(); - AmqpMessage^ CreateMessage(); - void Send(AmqpMessage^ m, TimeSpan timeout); - IAsyncResult^ BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state); - void EndSend(IAsyncResult^ result); - - property AmqpTypes::AmqpProperties^ DefaultProperties { - AmqpTypes::AmqpProperties^ get () { return defaultProperties; } - void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; } - } - - property String^ DefaultSubject { - String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; } - } - - property String^ QpidSubject { - String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; } - } - -}; - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp deleted file mode 100644 index bfae1ab313..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp +++ /dev/null @@ -1,304 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - - -/* - * This program parses strings of the form "node/subject;{options}" as - * used in the Qpid messaging API. It provides basic wiring - * capabilities to create/delete temporary queues (to topic - * subsciptions) and unbound "point and shoot" queues. - */ - - -#include -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/Message.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/client/Future.h" - -#include "AmqpConnection.h" -#include "AmqpSession.h" -#include "AmqpMessage.h" -#include "MessageBodyStream.h" -#include "InputLink.h" -#include "OutputLink.h" -#include "QpidMarshal.h" -#include "QpidException.h" -#include "QpidAddress.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace msclr; - -using namespace qpid::client; -using namespace std; - -QpidAddress::QpidAddress(String^ s, bool isInput) { - address = s; - nodeName = s; - isInputChannel = isInput; - isQueue = true; - - if (address->StartsWith("//")) { - // special case old style address to default exchange, - // no options, output only - if ((s->IndexOf(';') != -1) || isInputChannel) - throw gcnew ArgumentException("Invalid 0-10 address: " + address); - nodeName = nodeName->Substring(2); - return; - } - - String^ options = nullptr; - int pos = s->IndexOf(';'); - if (pos != -1) { - options = s->Substring(pos + 1); - nodeName = s->Substring(0, pos); - - if (options->Length > 0) { - if (!options->StartsWith("{") || !options->EndsWith("}")) - throw gcnew ArgumentException("Invalid address: " + address); - options = options->Substring(1, options->Length - 2); - array^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries); - - if ((subOpts->Length % 2) != 0) - throw gcnew ArgumentException("Bad address (options): " + address); - - for (int i=0; i < subOpts->Length; i += 2) { - String^ opt = subOpts[i]; - String^ optArg = subOpts[i+1]; - if (opt->Equals("create")) { - creating = PolicyApplies(optArg); - } - else if (opt->Equals("delete")) { - deleting = PolicyApplies(optArg); - } - else if (opt->Equals("mode")) { - if (optArg->Equals("browse")) { - browsing = isInputChannel; - } - else if (!optArg->Equals("consume")) { - throw gcnew ArgumentException("Invalid browsing option: " + optArg); - } - } - else if (opt->Equals("assert") || opt->Equals("node")) { - throw gcnew ArgumentException("Unsupported address option: " + opt); - } - else { - throw gcnew ArgumentException("Bad address option: " + opt); - } - } - } - else - options = nullptr; - } - - pos = nodeName->IndexOf('/'); - if (pos != -1) { - subject = nodeName->Substring(pos + 1); - if (String::IsNullOrEmpty(subject)) - subject = nullptr; - nodeName = nodeName->Substring(0, pos); - } -} - - -QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) { - QpidAddress^ addr = gcnew QpidAddress(s, isInput); - return addr; -} - - -void QpidAddress::ResolveLink(AmqpSession^ amqpSession) { - - AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); - if (asyncSessionp == NULL) - throw gcnew ObjectDisposedException("session"); - - deleteName = nullptr; - isQueue = true; - - try { - Session session = sync(*asyncSessionp); - std::string n_name = QpidMarshal::ToNative(nodeName); - ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name); - - bool queueFound = !result.getQueueNotFound(); - bool exchangeFound = !result.getExchangeNotFound(); - - if (isInputChannel) { - - if (queueFound) { - linkName = nodeName; - if (deleting) - deleteName = nodeName; - } - else if (exchangeFound) { - isQueue = false; - String^ tmpkey = nullptr; - String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString(); - bool haveSubject = !String::IsNullOrEmpty(subject); - FieldTable bindArgs; - - std::string exchangeType = session.exchangeQuery(n_name).getType(); - if (exchangeType == "topic") { - tmpkey = haveSubject ? subject : "#"; - } - else if (exchangeType == "fanout") { - tmpkey = tmpname; - } - else if (exchangeType == "headers") { - tmpkey = haveSubject ? subject : "match-all"; - if (haveSubject) - bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject)); - bindArgs.setString("x-match", "all"); - } - else if (exchangeType == "xml") { - tmpkey = haveSubject ? subject : ""; - if (haveSubject) { - String^ v = "declare variable $qpid.subject external; $qpid.subject = '" + - subject + "'"; - bindArgs.setString("xquery", QpidMarshal::ToNative(v)); - } - else - bindArgs.setString("xquery", "true()"); - } - else { - tmpkey = haveSubject ? subject : ""; - } - - std::string qn = QpidMarshal::ToNative(tmpname); - session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true); - bool success = false; - try { - session.exchangeBind(arg::exchange=n_name, arg::queue=qn, - arg::bindingKey=QpidMarshal::ToNative(tmpkey), - arg::arguments=bindArgs); - bindKey = tmpkey; // remember for later cleanup - success = true; - } - finally { - if (!success) - session.queueDelete(arg::queue=qn); - } - linkName = tmpname; - deleteName = tmpname; - deleting = true; - } - else if (creating) { - // only create "point and shoot" queues for now - session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); - // leave unbound - - linkName = nodeName; - - if (deleting) - deleteName = nodeName; - } - else { - throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); - } - } - else { - // Output channel - - bool oldStyleUri = address->StartsWith("//"); - - if (queueFound) { - linkName = ""; // default exchange for point and shoot - routingKey = nodeName; - if (deleting) - deleteName = nodeName; - } - else if (exchangeFound && !oldStyleUri) { - isQueue = false; - linkName = nodeName; - routingKey = subject; - } - else if (creating) { - // only create "point and shoot" queues for now - session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); - // leave unbound - linkName = ""; - routingKey = nodeName; - if (deleting) - deleteName = nodeName; - } - else { - throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); - } - } - } - finally { - amqpSession->ReturnNativeSession(); - } -} - -void QpidAddress::CleanupLink(AmqpSession^ amqpSession) { - if (deleteName == nullptr) - return; - - AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); - if (asyncSessionp == NULL) { - // TODO: log it: can't undo tear down actions - return; - } - - try { - Session session = sync(*asyncSessionp); - std::string q = QpidMarshal::ToNative(deleteName); - if (isInputChannel && !isQueue) { - // undo the temp wiring to the topic - session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q, - arg::bindingKey=QpidMarshal::ToNative(bindKey)); - } - session.queueDelete(q); - } - catch (Exception^ e) { - // TODO: log it - } - finally { - amqpSession->ReturnNativeSession(); - } -} - -bool QpidAddress::PolicyApplies(String^ mode) { - if (mode->Equals("always")) - return true; - if (mode->Equals("sender")) - return !isInputChannel; - if (mode->Equals("receiver")) - return isInputChannel; - if (mode->Equals("never")) - return false; - - throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address)); -} - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h deleted file mode 100644 index d24317c2aa..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h +++ /dev/null @@ -1,89 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -#include "MessageWaiter.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace System::Runtime::InteropServices; - -using namespace qpid::client; -using namespace std; - - -public ref class QpidAddress -{ -private: - QpidAddress(String^ address, bool isInput); - - // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded - String^ address; - - String^ nodeName; - // "qpid.subject" - String^ subject; - // 0-10 routing key (Output channels only) - String^ routingKey; - - String^ linkName; - String^ deleteName; - String^ bindKey; - - // node type: queue/topic - bool isQueue; - - // direction - bool isInputChannel; - - bool creating; - bool deleting; - bool browsing; - - bool PolicyApplies(String^ mode); - -internal: - static QpidAddress^ CreateAddress(String ^s, bool isInput); - void ResolveLink(AmqpSession^ amqpSession); - void CleanupLink(AmqpSession^ amqpSession); - - property String^ LinkName { - String^ get () { return linkName; } - } - - property String^ Subject { - String^ get () { return subject; } - } - - property String^ RoutingKey { - String^ get () { return routingKey; } - } - - property bool Browsing { - bool get () { return browsing; } - } - -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h deleted file mode 100644 index 91677a5e73..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; - -public ref class QpidException : System::Exception -{ - public: - - QpidException() : System::Exception() {} - QpidException(String^ estring) : System::Exception(estring) {} - -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h deleted file mode 100644 index 3e22af7b39..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h +++ /dev/null @@ -1,53 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Text; - - -// Helper functions for marshaling. - -private ref class QpidMarshal -{ - public: - - // marshal_as not available in all Visual Studio editions. - - static std::string ToNative (System::String^ managed) { - if (managed->Length == 0) { - return std::string(); - } - array^ mbytes = Encoding::UTF8->GetBytes(managed); - if (mbytes->Length == 0) { - return std::string(); - } - - pin_ptr pinnedBuf = &mbytes[0]; - std::string native((char *) pinnedBuf, mbytes->Length); - return native; - } -}; - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp deleted file mode 100644 index 23743316ff..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp +++ /dev/null @@ -1,525 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/Xid.h" - -#include "QpidException.h" -#include "AmqpConnection.h" -#include "AmqpSession.h" -#include "DtxResourceManager.h" -#include "XaTransaction.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Transactions; -using namespace msclr; - -using namespace qpid::framing::dtx; - -// ------------------------------------------------------------------------ -// Start of a pure native code section -#pragma unmanaged -// ------------------------------------------------------------------------ - -// This is the native COM object the DTC expects to talk to for coordination. -// There is exactly one native instance of this for each managed XaTransaction object. - - -class DtcCallbackHandler : public ITransactionResourceAsync -{ -private: - long useCount; - DtcCallbackFp managedCallback; -public: - ITransactionEnlistmentAsync *txHandle; - DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {} - ~DtcCallbackHandler() {} - virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase); - virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused); - virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3); - - virtual HRESULT __stdcall TMDown(); - virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject); - virtual ULONG __stdcall DtcCallbackHandler::AddRef(); - virtual ULONG __stdcall DtcCallbackHandler::Release(); - void __stdcall AbortRequestDone(); -}; - - -HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase) -{ - if (singlePhase) { - return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL; - } - - return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL; -} - - -HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused) -{ - return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL; -} - -HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3) -{ - return managedCallback(DTC_ABORT) ? S_OK : E_FAIL; -} - - -HRESULT DtcCallbackHandler::TMDown() -{ - return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL; -} - - -HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject) -{ - *ppvObject = NULL; - - if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink)) - *ppvObject = this; - else - return ResultFromScode(E_NOINTERFACE); - - this->AddRef(); - return S_OK; -} - - -ULONG DtcCallbackHandler::AddRef() -{ - return InterlockedIncrement(&useCount); -} - - -ULONG DtcCallbackHandler::Release() -{ - long uc = InterlockedDecrement(&useCount); - - if (uc) - return uc; - - delete this; - return 0; -} - - -// ------------------------------------------------------------------------ -// End of pure native code section -#pragma managed -// ------------------------------------------------------------------------ - -#ifdef QPID_RECOVERY_TEST_HOOK -void XaTransaction::ForceRecovery() { - debugFailMode = true; -} -#endif - -// ------------------------------------------------------------------------ -// ------------------------------------------------------------------------ - - -XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) { - bool success = false; - xidp = NULL; - commandCompletionp = NULL; - firstDtxStartCompletionp = NULL; - nativeHandler = NULL; - resourceManager = rm; - controlSession = rm->DtxControlSession; - active = true; - preparing = false; - systemTransaction = t; - IntPtr comTxp = IntPtr::Zero; - completionHandle = gcnew ManualResetEvent(false); - - try { - enlistedSessions = gcnew Collections::Generic::List(); - - // take a System.Transactions.Transaction and obtain - // the corresponding DTC COM object. - IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t); - comTxp = Marshal::GetIUnknownForObject(dtcTransaction); - XID winXid; - HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid); - if (hr != S_OK) - throw gcnew QpidException("get XA XID"); - - // Convert the X/Open format to the internal Qpid format - xidp = new qpid::framing::Xid(); - xidp->setFormat((uint32_t) winXid.formatID); - int bqualPos = 0; - if (winXid.gtrid_length > 0) { - xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length)); - bqualPos = winXid.gtrid_length; - } - if (winXid.bqual_length > 0) { - xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length)); - } - - // create the callback chain: DTC proxy -> DtcCallbackHandler -> this - inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback); - IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate); - nativeHandler = new DtcCallbackHandler(static_cast(ip.ToPointer())); - // add myself for later smart pointer destruction - nativeHandler->AddRef(); - - hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle)); - - if (hr != S_OK) - throw gcnew QpidException("Enlist"); - - success = true; - } - finally { - if (!success) - Cleanup(); - if (comTxp != IntPtr::Zero) - ((IUnknown *) comTxp.ToPointer())->Release(); - } -} - - -void XaTransaction::Cleanup() { - if (firstDtxStartCompletionp != NULL) { - try { - firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); - } - catch (...) { - // TODO: log it? - } - - firstDtxStartCompletionp = NULL; - } - - if (nativeHandler != NULL) { - nativeHandler->Release(); - nativeHandler = NULL; - } - if (xidp != NULL) { - delete xidp; - xidp = NULL; - } -} - - -XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) { - lock l(enlistedSessions); - if (!active) - throw gcnew QpidException("transaction enlistment internal error"); - if (!enlistedSessions->Contains(session)) { - enlistedSessions->Add(session); - if (firstEnlistedSession == nullptr) { - firstEnlistedSession = session; - IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false); - firstDtxStartCompletionp = (TypedResult *) intptr.ToPointer(); - } - else { - // the broker must see the dtxStart as a join operation, and it must arrive - // at the broker after the first dtx start - if (firstDtxStartCompletionp != NULL) - firstDtxStartCompletionp->wait(); - session->DtxStart((IntPtr) xidp, true, false); - } - } - else { - // already started once, so resume is true - session->DtxStart((IntPtr) xidp, false, true); - } - return this; -} - - -void XaTransaction::SessionClosing(AmqpSession^ session) { - lock l(enlistedSessions); - if (!enlistedSessions->Contains(session)) - return; - - enlistedSessions->Remove(session); - if (!active) { - // Phase0Flush already done on all sessions - l.release(); - return; - } - - IntPtr completion = session->BeginPhase0Flush(this); - session->EndPhase0Flush(this, completion); - - if (session == firstEnlistedSession) { - // if we just completed the dtxEnd, we know the dtxStart completed before that - if (firstDtxStartCompletionp != NULL) { - firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); - firstDtxStartCompletionp = NULL; - } - } -} - - -void XaTransaction::Phase0Flush() { - // let each session delimit their transactional work with an AMQP dtx.end protocol frame - lock l(enlistedSessions); - if (!active) - return; - - active = false; // no more enlistments - int scount = enlistedSessions->Count; - - if (scount > 0) { - array ^completions = gcnew array(scount); - for (int i = 0; i < scount; i++) { - - // TODO: skip phase0 flush for rollback case - - completions[i] = enlistedSessions[i]->BeginPhase0Flush(this); - } - - for (int i = 0; i < scount; i++) { - // without each session.sync(), session commands are queued up in the right order, - // but on their separate outbound channels, and destined for receipt at separate Broker inbound - // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the - // correct order before commit on the broker without the sync. - enlistedSessions[i]->EndPhase0Flush(this, completions[i]); - } - } - - // since all dtxEnds have completed, we know all starts have too - if (firstDtxStartCompletionp != NULL) { - try { - firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); - } - catch (...) { - // TODO: log it? - } - - firstDtxStartCompletionp = NULL; - } -} - - -bool XaTransaction::DtcCallback (DtcCallbackType callback) { - // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?) - - if (AppDomain::CurrentDomain->IsFinalizingForUnload()) - return false; - - IntPtr intptr = IntPtr::Zero; - currentCommand = callback; - - try { - switch (callback) { - case DTC_PREPARE: - Phase0Flush(); - try { - intptr = controlSession->DtxPrepare((IntPtr) xidp); - preparing = true; - resourceManager->IncrementDoubt(); - } - catch (System::Exception^ ) { - // intptr remains nullptr - } - commandCompletionp = (TypedResult *) intptr.ToPointer(); - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); - break; - - case DTC_COMMIT: -#ifdef QPID_RECOVERY_TEST_HOOK - if (debugFailMode){ return; } -#endif - // no phase 0 required. always preceded by a prepare - try { - intptr = controlSession->DtxCommit((IntPtr) xidp, false); - } - catch (System::Exception^ ) { - // intptr remains nullptr - } - commandCompletionp = (TypedResult *) intptr.ToPointer(); - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); - break; - - case DTC_ABORT: - Phase0Flush(); -#ifdef QPID_RECOVERY_TEST_HOOK - if (debugFailMode){ return; } -#endif - try { - intptr = controlSession->DtxRollback((IntPtr) xidp); - } - catch (System::Exception^ ) { - // intptr remains nullptr - } - commandCompletionp = (TypedResult *) intptr.ToPointer(); - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); - break; - - case DTC_SINGLE_PHASE: - Phase0Flush(); - try { - intptr = controlSession->DtxCommit((IntPtr) xidp, true); - } - catch (System::Exception^ ) { - // intptr remains nullptr - } - commandCompletionp = (TypedResult *) intptr.ToPointer(); - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); - break; - - case DTC_TMDOWN: - commandCompletionp = NULL; - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); - break; - } - return true; - } - catch (System::Exception^ e) { - // TODO: log it - Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString()); - } - catch (...) { - // TODO: log it - } - return false; -} - - -// this handles the case where the application regains control for -// a new transaction before we are notified (abort/rollback -// optimization in DTC). - -void XaTransaction::NotifyPhase0() { - if (active) - Phase0Flush(); -} - - -void XaTransaction::AsyncCompleter(Object ^unused) { - bool success = false; - - if (commandCompletionp != NULL) { - try { - // waits for the AMQP broker's response and returns the decoded content - XaResult& xaResult = commandCompletionp->get(); - if (xaResult.hasStatus()) { - if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) { - success = true; - } - } - } - catch (...) { - // TODO: log it? - } - try { - controlSession->ReleaseCompletion((IntPtr) commandCompletionp); - } - catch (...) { - // TODO: log it? - } - - commandCompletionp = NULL; - } - - ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle; - - HRESULT hr = success ? S_OK : E_FAIL; - - switch (currentCommand) { - case DTC_PREPARE: - dtcTxHandle->PrepareRequestDone(hr, NULL, NULL); - break; - - case DTC_COMMIT: - dtcTxHandle->CommitRequestDone(hr); - if (success) - resourceManager->DecrementDoubt(); - Complete(); - break; - - case DTC_ABORT: - dtcTxHandle->AbortRequestDone(hr); - if (success) { - if (preparing) { - preparing = false; - resourceManager->DecrementDoubt(); - } - } - Complete(); - break; - - case DTC_SINGLE_PHASE: - if (success) - hr = XACT_S_SINGLEPHASE; - dtcTxHandle->PrepareRequestDone(hr, NULL, NULL); - Complete(); - break; - - case DTC_TMDOWN: - // Stop the RM from accepting new enlistments - resourceManager->TmDown(); - Complete(); - break; - } -} - - -void XaTransaction::Complete() { - Cleanup(); - resourceManager->Complete(systemTransaction); - completionHandle->Set(); -} - - -void XaTransaction::WaitForCompletion() { - completionHandle->WaitOne(); -} - - - /* -void XaTransaction::WaitForFlush() { - isFlushedHandle->WaitOne(); -} - */ - -// called from DtxResourceManager Finalize - -void XaTransaction::ChildFinalize() { - lock l(enlistedSessions); - Phase0Flush(); - Cleanup(); -} - - - -}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h deleted file mode 100644 index 8ff9f99893..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h +++ /dev/null @@ -1,96 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#pragma once - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace System::Transactions; - -enum DtcCallbackType{ - DTC_PREPARE, - DTC_COMMIT, - DTC_ABORT, - DTC_SINGLE_PHASE, - DTC_TMDOWN -}; - - -ref class DtxResourceManager; -class DtcCallbackHandler; - -// Function pointer declaratiom for managed space delegate -typedef bool (__stdcall *DtcCallbackFp)(DtcCallbackType); - -// and the delegate with the same signature -public delegate bool DtcCallbackDelegate(DtcCallbackType); - - - -public ref class XaTransaction -{ -private: - bool active; - DtxResourceManager^ resourceManager; - Transaction^ systemTransaction; - AmqpSession^ controlSession; - Collections::Generic::List^ enlistedSessions; - qpid::framing::Xid* xidp; - DtcCallbackHandler* nativeHandler; - bool preparing; - DtcCallbackDelegate^ inboundDelegate; - // the Qpid async result of the AMQP dtx prepare/commit commands - TypedResult* commandCompletionp; - // the Qpid async result of the first session to do dtx start - TypedResult* firstDtxStartCompletionp; - ManualResetEvent^ completionHandle; - - AmqpSession^ firstEnlistedSession; - DtcCallbackType currentCommand; - void AsyncCompleter(Object ^); - void Phase0Flush(); - void Cleanup(); - void Complete(); - -internal: - XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *pXaHelper, DWORD rmCookie, DtxResourceManager^ rm); - XaTransaction^ Enlist (AmqpSession ^session); - bool DtcCallback (DtcCallbackType callback); - void NotifyPhase0(); - void ChildFinalize(); - void SessionClosing(AmqpSession^ session); - void WaitForCompletion(); - - property IntPtr XidHandle { - IntPtr get () { return (IntPtr) xidp; } - } - -#ifdef QPID_RECOVERY_TEST_HOOK - void ForceRecovery(); - bool debugFailMode; -#endif - -}; - -}}} // namespace Apache::Qpid::Interop - diff --git a/qpid/wcf/src/wcfnet.snk b/qpid/wcf/src/wcfnet.snk deleted file mode 100644 index d6456c8cf3..0000000000 Binary files a/qpid/wcf/src/wcfnet.snk and /dev/null differ -- cgit v1.2.1