diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
| commit | 7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch) | |
| tree | 3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/Message | |
| parent | 8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff) | |
| download | qpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz | |
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/Message')
11 files changed, 2009 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs new file mode 100644 index 0000000000..eb34fa45db --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class AMQMessage + { + protected IContentHeaderProperties _contentHeaderProperties; + + /// <summary> + /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required + /// </summary> + protected AmqChannel _channel; + + public AMQMessage(IContentHeaderProperties properties) + { + _contentHeaderProperties = properties; + } + + public AmqChannel Channel + { + get + { + return _channel; + } + + set + { + _channel = value; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs new file mode 100644 index 0000000000..e485bb6d34 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public abstract class AbstractQmsMessageFactory : IMessageFactory + { + public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies) + { + AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); + msg.Redelivered = redelivered; + return msg; + } + + public abstract AbstractQmsMessage CreateMessage(); + + /// <summary> + /// + /// </summary> + /// <param name="messageNbr"></param> + /// <param name="contentHeader"></param> + /// <param name="bodies"></param> + /// <returns></returns> + /// <exception cref="AMQException"></exception> + protected abstract AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, + ContentHeaderBody contentHeader, + IList bodies); + } +} diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs new file mode 100644 index 0000000000..c84e9de1b9 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -0,0 +1,800 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class SendOnlyDestination : AMQDestination + { + private static readonly ILog _log = LogManager.GetLogger(typeof(string)); + + public SendOnlyDestination(string exchangeName, string routingKey) + : base(exchangeName, null, null, false, false, routingKey) + { + _log.Debug( + string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}", + exchangeName, routingKey)); + } + + public override string EncodedName + { + get { return ExchangeName + ":" + QueueName; } + } + + public override string RoutingKey + { + get { return QueueName; } + } + + public override bool IsNameRequired + { + get { throw new NotImplementedException(); } + } + } + + public abstract class AbstractQmsMessage : AMQMessage, IMessage + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage)); + + protected ulong _messageNbr; + + protected bool _redelivered; + + protected AbstractQmsMessage() : base(new BasicContentHeaderProperties()) + { + } + + protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader) + : this(contentHeader) + { + _messageNbr = messageNbr; + } + + protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader) + : base(contentHeader) + { + } + + public string MessageId + { + get + { + if (ContentHeaderProperties.MessageId == null) + { + ContentHeaderProperties.MessageId = "ID:" + _messageNbr; + } + return ContentHeaderProperties.MessageId; + } + set + { + ContentHeaderProperties.MessageId = value; + } + } + + public long Timestamp + { + get + { + // TODO: look at ulong/long choice + return (long) ContentHeaderProperties.Timestamp; + } + set + { + ContentHeaderProperties.Timestamp = (ulong) value; + } + } + + public byte[] CorrelationIdAsBytes + { + get + { + return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); + } + set + { + ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); + } + } + + public string CorrelationId + { + get + { + return ContentHeaderProperties.CorrelationId; + } + set + { + ContentHeaderProperties.ContentType = value; + } + } + + struct Dest + { + public string ExchangeName; + public string RoutingKey; + + public Dest(string exchangeName, string routingKey) + { + ExchangeName = exchangeName; + RoutingKey = routingKey; + } + } + + public string ReplyToExchangeName + { + get + { + Dest dest = ReadReplyToHeader(); + return dest.ExchangeName; + } + set + { + Dest dest = ReadReplyToHeader(); + dest.ExchangeName = value; + WriteReplyToHeader(dest); + } + } + + public string ReplyToRoutingKey + { + get + { + Dest dest = ReadReplyToHeader(); + return dest.RoutingKey; + } + set + { + Dest dest = ReadReplyToHeader(); + dest.RoutingKey = value; + WriteReplyToHeader(dest); + } + } + + private Dest ReadReplyToHeader() + { + string replyToEncoding = ContentHeaderProperties.ReplyTo; + if (replyToEncoding == null) + { + return new Dest(); + } + else + { + string routingKey; + string exchangeName = GetExchangeName(replyToEncoding, out routingKey); + return new Dest(exchangeName, routingKey); + } + } + + private void WriteReplyToHeader(Dest dest) + { + string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); + ContentHeaderProperties.ReplyTo = encodedDestination; + } + + private static string GetExchangeName(string replyToEncoding, out string routingKey) + { + string[] split = replyToEncoding.Split(new char[':']); + if (_log.IsDebugEnabled) + { + _log.Debug(string.Format("replyToEncoding = '{0}'", replyToEncoding)); + _log.Debug(string.Format("split = {0}", split)); + _log.Debug(string.Format("split.Length = {0}", split.Length)); + } + if (split.Length == 1) + { + // Using an alternative split implementation here since it appears that string.Split + // is broken in .NET. It doesn't split when the first character is the delimiter. + // Here we check for the first character being the delimiter. This handles the case + // where ExchangeName is empty (i.e. sends will be to the default exchange). + if (replyToEncoding[0] == ':') + { + split = new string[2]; + split[0] = null; + split[1] = replyToEncoding.Substring(1); + if (_log.IsDebugEnabled) + { + _log.Debug("Alternative split method..."); + _log.Debug(string.Format("split = {0}", split)); + _log.Debug(string.Format("split.Length = {0}", split.Length)); + } + } + } + if (split.Length != 2) + { + throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); + } + + string exchangeName = split[0]; + routingKey = split[1]; + return exchangeName; + } + + public DeliveryMode DeliveryMode + { + get + { + byte b = ContentHeaderProperties.DeliveryMode; + switch (b) + { + case 1: + return DeliveryMode.NonPersistent; + case 2: + return DeliveryMode.Persistent; + default: + throw new QpidException("Illegal value for delivery mode in content header properties"); + } + } + set + { + ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); + } + } + + public bool Redelivered + { + get + { + return _redelivered; + } + set + { + _redelivered = value; + } + } + + public string Type + { + get + { + return MimeType; + } + set + { + //MimeType = value; + } + } + + public long Expiration + { + get + { + return ContentHeaderProperties.Expiration; + } + set + { + ContentHeaderProperties.Expiration = (uint) value; + } + } + + public int Priority + { + get + { + return ContentHeaderProperties.Priority; + } + set + { + ContentHeaderProperties.Priority = (byte) value; + } + } + + // FIXME: implement + public string ContentType + { + get { throw new NotImplementedException(); } + set { throw new NotImplementedException(); } + } + + // FIXME: implement + public string ContentEncoding + { + get { throw new NotImplementedException(); } + set { throw new NotImplementedException(); } + } + + public void Acknowledge() + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_channel != null) + { + _channel.SendAcknowledgement(_messageNbr); + } + } + + public IHeaders Headers + { + get { return new QpidHeaders(this); } + } + + public abstract void ClearBody(); + + /// <summary> + /// Get a String representation of the body of the message. Used in the + /// toString() method which outputs this before message properties. + /// </summary> + /// <exception cref="QpidException"></exception> + public abstract string ToBodyString(); + + /// <summary> + /// Return the raw byte array that is used to populate the frame when sending + /// the message. + /// </summary> + /// <value>a byte array of message data</value> + public abstract byte[] Data + { + get; + set; + } + + public abstract string MimeType + { + get; + } + + public override string ToString() + { + try + { + StringBuilder buf = new StringBuilder("Body:\n"); + buf.Append(ToBodyString()); + buf.Append("\nQmsTimestamp: ").Append(Timestamp); + buf.Append("\nQmsExpiration: ").Append(Expiration); + buf.Append("\nQmsPriority: ").Append(Priority); + buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); + buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); + buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); + buf.Append("\nAMQ message number: ").Append(_messageNbr); + buf.Append("\nProperties:"); + if (ContentHeaderProperties.Headers == null) + { + buf.Append("<NONE>"); + } + else + { + buf.Append(Headers.ToString()); + } + return buf.ToString(); + } + catch (Exception e) + { + return e.ToString(); + } + } + + public IFieldTable UnderlyingMessagePropertiesMap + { + get + { + return ContentHeaderProperties.Headers; + } + set + { + ContentHeaderProperties.Headers = (FieldTable)value; + } + } + + public FieldTable PopulateHeadersFromMessageProperties() + { + if (ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + // + // We need to convert every property into a String representation + // Note that type information is preserved in the property name + // + FieldTable table = new FieldTable(); + foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) + { + string propertyName = (string) entry.Key; + if (propertyName == null) + { + continue; + } + else + { + table[propertyName] = entry.Value.ToString(); + } + } + return table; + } + } + + /// <summary> + /// Get the AMQ message number assigned to this message + /// </summary> + /// <returns>the message number</returns> + public ulong MessageNbr + { + get + { + return _messageNbr; + } + set + { + _messageNbr = value; + } + } + + public BasicContentHeaderProperties ContentHeaderProperties + { + get + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + } + } + + internal class QpidHeaders : IHeaders + { + public const char BOOLEAN_PROPERTY_PREFIX = 'B'; + public const char BYTE_PROPERTY_PREFIX = 'b'; + public const char SHORT_PROPERTY_PREFIX = 's'; + public const char INT_PROPERTY_PREFIX = 'i'; + public const char LONG_PROPERTY_PREFIX = 'l'; + public const char FLOAT_PROPERTY_PREFIX = 'f'; + public const char DOUBLE_PROPERTY_PREFIX = 'd'; + public const char STRING_PROPERTY_PREFIX = 'S'; + + AbstractQmsMessage _message; + + public QpidHeaders(AbstractQmsMessage message) + { + _message = message; + } + + public bool Contains(string name) + { + CheckPropertyName(name); + if (_message.ContentHeaderProperties.Headers == null) + { + return false; + } + else + { + // TODO: fix this + return _message.ContentHeaderProperties.Headers.Contains(STRING_PROPERTY_PREFIX + name); + } + } + + public void Clear() + { + if (_message.ContentHeaderProperties.Headers != null) + { + _message.ContentHeaderProperties.Headers.Clear(); + } + } + + public string this[string name] + { + get + { + return GetString(name); + } + set + { + SetString(name, value); + } + } + + public bool GetBoolean(string name) + { + CheckPropertyName(name); + if (_message.ContentHeaderProperties.Headers == null) + { + return false; + } + else + { + object b = _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name]; + + if (b == null) + { + return false; + } + else + { + return (bool)b; + } + } + } + + public void SetBoolean(string name, bool b) + { + CheckPropertyName(name); + _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name] = b; + } + + public byte GetByte(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object b = _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName]; + if (b == null) + { + return 0; + } + else + { + return (byte)b; + } + } + } + + public void SetByte(string propertyName, byte b) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName] = b; + } + + public short GetShort(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object s = _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName]; + if (s == null) + { + return 0; + } + else + { + return (short)s; + } + } + } + + public void SetShort(string propertyName, short i) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName] = i; + } + + public int GetInt(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object i = _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName]; + if (i == null) + { + return 0; + } + else + { + return (int)i; + } + } + } + + public void SetInt(string propertyName, int i) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName] = i; + } + + public long GetLong(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object l = _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName]; + if (l == null) + { + // temp - the spec says do this but this throws a NumberFormatException + //return Long.valueOf(null).longValue(); + return 0; + } + else + { + return (long)l; + } + } + } + + public void SetLong(string propertyName, long l) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName] = l; + } + + public float GetFloat(String propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object f = _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName]; + if (f == null) + { + return 0; + } + else + { + return (float)f; + } + } + } + + public void SetFloat(string propertyName, float f) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName] = f; + } + + public double GetDouble(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object d = _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName]; + if (d == null) + { + return 0; + } + else + { + return (double)d; + } + } + } + + public void SetDouble(string propertyName, double v) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName] = v; + } + + public string GetString(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + return (string)_message.ContentHeaderProperties.Headers[STRING_PROPERTY_PREFIX + propertyName]; + } + } + + public void SetString(string propertyName, string value) + { + CheckPropertyName(propertyName); + CreatePropertyMapIfRequired(); + propertyName = STRING_PROPERTY_PREFIX + propertyName; + _message.ContentHeaderProperties.Headers[propertyName] = value; + } + + private void CheckPropertyName(string propertyName) + { + if (propertyName == null) + { + throw new ArgumentException("Property name must not be null"); + } + else if ("".Equals(propertyName)) + { + throw new ArgumentException("Property name must not be the empty string"); + } + + if (_message.ContentHeaderProperties.Headers == null) + { + _message.ContentHeaderProperties.Headers = new FieldTable(); + } + } + + private void CreatePropertyMapIfRequired() + { + if (_message.ContentHeaderProperties.Headers == null) + { + _message.ContentHeaderProperties.Headers = new FieldTable(); + } + } + + public override string ToString() + { + StringBuilder buf = new StringBuilder("{"); + int i = 0; + foreach (DictionaryEntry entry in _message.ContentHeaderProperties.Headers) + { + ++i; + if (i > 1) + { + buf.Append(", "); + } + string propertyName = (string)entry.Key; + if (propertyName == null) + { + buf.Append("\nInternal error: Property with NULL key defined"); + } + else + { + buf.Append(propertyName.Substring(1)); + + buf.Append(" : "); + + char typeIdentifier = propertyName[0]; + buf.Append(typeIdentifierToName(typeIdentifier)); + buf.Append(" = ").Append(entry.Value); + } + } + buf.Append("}"); + return buf.ToString(); + } + + private static string typeIdentifierToName(char typeIdentifier) + { + switch (typeIdentifier) + { + case BOOLEAN_PROPERTY_PREFIX: + return "boolean"; + case BYTE_PROPERTY_PREFIX: + return "byte"; + case SHORT_PROPERTY_PREFIX: + return "short"; + case INT_PROPERTY_PREFIX: + return "int"; + case LONG_PROPERTY_PREFIX: + return "long"; + case FLOAT_PROPERTY_PREFIX: + return "float"; + case DOUBLE_PROPERTY_PREFIX: + return "double"; + case STRING_PROPERTY_PREFIX: + return "string"; + default: + return "unknown ( '" + typeIdentifier + "')"; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs new file mode 100644 index 0000000000..2e71bfc948 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public interface IMessageFactory + { + /// <summary> + /// Create a message + /// </summary> + /// <param name="messageNbr"></param> + /// <param name="redelivered"></param> + /// <param name="contentHeader"></param> + /// <param name="bodies"></param> + /// <returns></returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies); + + /// <summary> + /// Creates the message. + /// </summary> + /// <returns></returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(); + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs new file mode 100644 index 0000000000..3965d531bb --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class MessageFactoryRegistry + { + private readonly Hashtable _mimeToFactoryMap = new Hashtable(); + + public void RegisterFactory(string mimeType, IMessageFactory mf) + { + if (mf == null) + { + throw new ArgumentNullException("Message factory"); + } + if (mimeType == null) + { + throw new ArgumentNullException("mf"); + } + _mimeToFactoryMap[mimeType] = mf; + } + + public void DeregisterFactory(string mimeType) + { + _mimeToFactoryMap.Remove(mimeType); + } + + /// <summary> + /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate + /// concrete message type. + /// </summary> + /// <param name="messageNbr">the AMQ message id</param> + /// <param name="redelivered">true if redelivered</param> + /// <param name="contentHeader">the content header that was received</param> + /// <param name="bodies">a list of ContentBody instances</param> + /// <returns>the message.</returns> + /// <exception cref="AMQException"/> + /// <exception cref="QpidException"/> + public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) + { + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.Properties; + + if (properties.ContentType == null) + { + properties.ContentType = ""; + } + + IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType]; + if (mf == null) + { + throw new AMQException("Unsupport MIME type of " + properties.ContentType); + } + else + { + return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies); + } + } + + public AbstractQmsMessage CreateMessage(string mimeType) + { + if (mimeType == null) + { + throw new ArgumentNullException("Mime type must not be null"); + } + IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType]; + if (mf == null) + { + throw new AMQException("Unsupport MIME type of " + mimeType); + } + else + { + return mf.CreateMessage(); + } + } + + /// <summary> + /// Construct a new registry with the default message factories registered + /// </summary> + /// <returns>a message factory registry</returns> + public static MessageFactoryRegistry NewDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.RegisterFactory("text/plain", new QpidTextMessageFactory()); + mf.RegisterFactory("text/xml", new QpidTextMessageFactory()); + mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory()); + // TODO: use bytes message for default message factory + // MJA - just added this bit back in... + mf.RegisterFactory("", new QpidBytesMessageFactory()); + return mf; + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs new file mode 100644 index 0000000000..b7911b44b9 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs @@ -0,0 +1,581 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Text; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage + { + private const string MIME_TYPE = "application/octet-stream"; + + /// <summary> + /// The backingstore for the data + /// </summary> + private MemoryStream _dataStream; + + private int _bodyLength; + + private BinaryReader _reader; + + private BinaryWriter _writer; + + public QpidBytesMessage() : this(null) + { + } + + /// <summary> + /// Construct a bytes message with existing data. + /// </summary> + /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in + /// write-only mode</param> + QpidBytesMessage(byte[] data) : base() + { + // superclass constructor has instantiated a content header at this point + ContentHeaderProperties.ContentType = MIME_TYPE; + if (data == null) + { + _dataStream = new MemoryStream(); + _writer = new BinaryWriter(_dataStream); + } + else + { + _dataStream = new MemoryStream(data); + _bodyLength = data.Length; + _reader = new BinaryReader(_dataStream); + } + } + + public QpidBytesMessage(ulong messageNbr, byte[] data, ContentHeaderBody contentHeader) + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + : base(messageNbr, (BasicContentHeaderProperties) contentHeader.Properties) + { + ContentHeaderProperties.ContentType = MIME_TYPE; + _dataStream = new MemoryStream(data); + _bodyLength = data.Length; + _reader = new BinaryReader(_dataStream); + } + + public override void ClearBody() + { + if (_reader != null) + { + _reader.Close(); + _reader = null; + } + _dataStream = new MemoryStream(); + _bodyLength = 0; + + _writer = new BinaryWriter(_dataStream); + } + + public override string ToBodyString() + { + CheckReadable(); + try + { + return GetText(); + } + catch (IOException e) + { + throw new QpidException(e.ToString()); + } + } + + private string GetText() + { + if (_dataStream != null) + { + // we cannot just read the underlying buffer since it may be larger than the amount of + // "filled" data. Length is not the same as Capacity. + byte[] data = new byte[_dataStream.Length]; + _dataStream.Read(data, 0, (int)_dataStream.Length); + return Encoding.UTF8.GetString(data); + } + else + { + return null; + } + } + + public override byte[] Data + { + get + { + if (_dataStream == null) + { + return null; + } + else + { + byte[] data = new byte[_dataStream.Length]; + _dataStream.Position = 0; + _dataStream.Read(data, 0, (int) _dataStream.Length); + return data; + } + } + set + { + throw new NotSupportedException("Cannot set data payload except during construction"); + } + } + + public override string MimeType + { + get + { + return MIME_TYPE; + } + } + + public long BodyLength + { + get + { + CheckReadable(); + return _bodyLength; + } + } + + /// <summary> + /// + /// </summary> + /// <exception cref="MessageNotReadableException">if the message is in write mode</exception> + private void CheckReadable() + { + if (_reader == null) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + private void CheckWritable() + { + if (_reader != null) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + public bool ReadBoolean() + { + CheckReadable(); + try + { + return _reader.ReadBoolean(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public byte ReadByte() + { + CheckReadable(); + try + { + return _reader.ReadByte(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public short ReadSignedByte() + { + CheckReadable(); + try + { + return _reader.ReadSByte(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public short ReadShort() + { + CheckReadable(); + try + { + return _reader.ReadInt16(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public char ReadChar() + { + CheckReadable(); + try + { + return _reader.ReadChar(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadInt() + { + CheckReadable(); + try + { + return _reader.ReadInt32(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public long ReadLong() + { + CheckReadable(); + try + { + return _reader.ReadInt64(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public float ReadFloat() + { + CheckReadable(); + try + { + return _reader.ReadSingle(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public double ReadDouble() + { + CheckReadable(); + try + { + return _reader.ReadDouble(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public string ReadUTF() + { + CheckReadable(); + try + { + byte[] data = _reader.ReadBytes((int)_dataStream.Length); + return Encoding.UTF8.GetString(data); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadBytes(byte[] bytes) + { + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + CheckReadable(); + try + { + return _reader.Read(bytes, 0, bytes.Length); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadBytes(byte[] bytes, int count) + { + CheckReadable(); + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + if (count < 0) + { + throw new ArgumentOutOfRangeException("count must be >= 0"); + } + if (count > bytes.Length) + { + count = bytes.Length; + } + + try + { + return _reader.Read(bytes, 0, count); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteBoolean(bool b) + { + CheckWritable(); + try + { + _writer.Write(b); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteByte(byte b) + { + CheckWritable(); + try + { + _writer.Write(b); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteShort(short i) + { + CheckWritable(); + try + { + _writer.Write(i); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteChar(char c) + { + CheckWritable(); + try + { + _writer.Write(c); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteSignedByte(short value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteDouble(double value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteFloat(float value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteInt(int value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteLong(long value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(int i) + { + CheckWritable(); + try + { + _writer.Write(i); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(long l) + { + CheckWritable(); + try + { + _writer.Write(l); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(float v) + { + CheckWritable(); + try + { + _writer.Write(v); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(double v) + { + CheckWritable(); + try + { + _writer.Write(v); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteUTF(string value) + { + CheckWritable(); + try + { + byte[] encodedData = Encoding.UTF8.GetBytes(value); + _writer.Write(encodedData); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteBytes(byte[] bytes) + { + CheckWritable(); + try + { + _writer.Write(bytes); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteBytes(byte[] bytes, int offset, int length) + { + CheckWritable(); + try + { + _writer.Write(bytes, offset, length); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Reset() + { + CheckWritable(); + try + { + _writer.Close(); + _writer = null; + _reader = new BinaryReader(_dataStream); + _bodyLength = (int) _dataStream.Length; + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs new file mode 100644 index 0000000000..3f2a6c531f --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class QpidBytesMessageFactory : AbstractQmsMessageFactory + { + protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, + ContentHeaderBody contentHeader, + IList bodies) + { + byte[] data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + data = ((ContentBody)bodies[0]).Payload; + } + else + { + data = new byte[(long)contentHeader.BodySize]; + int currentPosition = 0; + foreach (ContentBody cb in bodies) + { + Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + currentPosition += cb.Payload.Length; + } + } + + return new QpidBytesMessage(messageNbr, data, contentHeader); + } + + public override AbstractQmsMessage CreateMessage() + { + return new QpidBytesMessage(); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs new file mode 100644 index 0000000000..4c16038d4b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class QpidTextMessage : AbstractQmsMessage, ITextMessage + { + private const string MIME_TYPE = "text/plain"; + + private byte[] _data; + + private string _decodedValue; + + public QpidTextMessage() : this(null, null) + { + } + + public QpidTextMessage(byte[] data, String encoding) : base() + { + // the superclass has instantied a content header at this point + ContentHeaderProperties.ContentType= MIME_TYPE; + _data = data; + ContentHeaderProperties.Encoding = encoding; + } + + public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader) + : base(messageNbr, contentHeader) + { + contentHeader.ContentType = MIME_TYPE; + _data = data; + } + + public QpidTextMessage(byte[] data) : this(data, null) + { + } + + public QpidTextMessage(string text) + { + Text = text; + } + + public override void ClearBody() + { + _data = null; + _decodedValue = null; + } + + public override string ToBodyString() + { + return Text; + } + + public override byte[] Data + { + get + { + return _data; + } + set + { + _data = value; + } + } + + public override string MimeType + { + get + { + return MIME_TYPE; + } + } + + public string Text + { + get + { + if (_data == null && _decodedValue == null) + { + return null; + } + else if (_decodedValue != null) + { + return _decodedValue; + } + else + { + if (ContentHeaderProperties.Encoding != null) + { + // throw ArgumentException if the encoding is not supported + _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data); + } + else + { + _decodedValue = Encoding.Default.GetString(_data); + } + return _decodedValue; + } + } + + set + { + if (ContentHeaderProperties.Encoding == null) + { + _data = Encoding.Default.GetBytes(value); + } + else + { + // throw ArgumentException if the encoding is not supported + _data = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value); + } + _decodedValue = value; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs new file mode 100644 index 0000000000..5457b2301e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class QpidTextMessageFactory : AbstractQmsMessageFactory + { + protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, ContentHeaderBody contentHeader, + IList bodies) + { + byte[] data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + data = ((ContentBody)bodies[0]).Payload; + } + else + { + data = new byte[(int)contentHeader.BodySize]; + int currentPosition = 0; + foreach (ContentBody cb in bodies) + { + Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + currentPosition += cb.Payload.Length; + } + } + + return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties); + } + + + public override AbstractQmsMessage CreateMessage() + { + return new QpidTextMessage(); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs new file mode 100644 index 0000000000..679114d105 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; + +namespace Qpid.Client.Message +{ + /// <summary> + /// Raised when a message body is received unexpectedly by the client. This typically occurs when the + /// length of bodies received does not match with the declared length in the content header. + /// </summary> + public class UnexpectedBodyReceivedException : AMQException + { + public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t) + : base(logger, msg, t) + { + } + + public UnexpectedBodyReceivedException(ILog logger, string msg) + : base(logger, msg) + { + } + + public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg) + : base(logger, errorCode, msg) + { + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs new file mode 100644 index 0000000000..cb4e64718b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class UnprocessedMessage + { + private ulong _bytesReceived = 0; + + public BasicDeliverBody DeliverBody; + public BasicReturnBody BounceBody; + public ushort ChannelId; + public ContentHeaderBody ContentHeader; + + /// <summary> + /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general + /// </summary> + /// TODO: write and use linked list class + public IList Bodies = new ArrayList(); + + public void ReceiveBody(ContentBody body) + { + Bodies.Add(body); + if (body.Payload != null) + { + _bytesReceived += (uint)body.Payload.Length; + } + } + + public bool IsAllBodyDataReceived() + { + return _bytesReceived == ContentHeader.BodySize; + } + } +} + |
