summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/Message
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-25 22:04:39 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-25 22:04:39 +0000
commit7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch)
tree3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/Message
parent8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff)
downloadqpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/Message')
-rw-r--r--dotnet/Qpid.Client/Client/Message/AMQMessage.cs52
-rw-r--r--dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs49
-rw-r--r--dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs800
-rw-r--r--dotnet/Qpid.Client/Client/Message/IMessageFactory.cs49
-rw-r--r--dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs117
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs581
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs60
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs137
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs60
-rw-r--r--dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs48
-rw-r--r--dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs56
11 files changed, 2009 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
new file mode 100644
index 0000000000..eb34fa45db
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class AMQMessage
+ {
+ protected IContentHeaderProperties _contentHeaderProperties;
+
+ /// <summary>
+ /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required
+ /// </summary>
+ protected AmqChannel _channel;
+
+ public AMQMessage(IContentHeaderProperties properties)
+ {
+ _contentHeaderProperties = properties;
+ }
+
+ public AmqChannel Channel
+ {
+ get
+ {
+ return _channel;
+ }
+
+ set
+ {
+ _channel = value;
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
new file mode 100644
index 0000000000..e485bb6d34
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public abstract class AbstractQmsMessageFactory : IMessageFactory
+ {
+ public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies)
+ {
+ AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies);
+ msg.Redelivered = redelivered;
+ return msg;
+ }
+
+ public abstract AbstractQmsMessage CreateMessage();
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="messageNbr"></param>
+ /// <param name="contentHeader"></param>
+ /// <param name="bodies"></param>
+ /// <returns></returns>
+ /// <exception cref="AMQException"></exception>
+ protected abstract AbstractQmsMessage CreateMessageWithBody(ulong messageNbr,
+ ContentHeaderBody contentHeader,
+ IList bodies);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
new file mode 100644
index 0000000000..c84e9de1b9
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -0,0 +1,800 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using log4net;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class SendOnlyDestination : AMQDestination
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(string));
+
+ public SendOnlyDestination(string exchangeName, string routingKey)
+ : base(exchangeName, null, null, false, false, routingKey)
+ {
+ _log.Debug(
+ string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}",
+ exchangeName, routingKey));
+ }
+
+ public override string EncodedName
+ {
+ get { return ExchangeName + ":" + QueueName; }
+ }
+
+ public override string RoutingKey
+ {
+ get { return QueueName; }
+ }
+
+ public override bool IsNameRequired
+ {
+ get { throw new NotImplementedException(); }
+ }
+ }
+
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+
+ protected ulong _messageNbr;
+
+ protected bool _redelivered;
+
+ protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
+ {
+ }
+
+ protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader)
+ : this(contentHeader)
+ {
+ _messageNbr = messageNbr;
+ }
+
+ protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader)
+ : base(contentHeader)
+ {
+ }
+
+ public string MessageId
+ {
+ get
+ {
+ if (ContentHeaderProperties.MessageId == null)
+ {
+ ContentHeaderProperties.MessageId = "ID:" + _messageNbr;
+ }
+ return ContentHeaderProperties.MessageId;
+ }
+ set
+ {
+ ContentHeaderProperties.MessageId = value;
+ }
+ }
+
+ public long Timestamp
+ {
+ get
+ {
+ // TODO: look at ulong/long choice
+ return (long) ContentHeaderProperties.Timestamp;
+ }
+ set
+ {
+ ContentHeaderProperties.Timestamp = (ulong) value;
+ }
+ }
+
+ public byte[] CorrelationIdAsBytes
+ {
+ get
+ {
+ return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId);
+ }
+ set
+ {
+ ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value);
+ }
+ }
+
+ public string CorrelationId
+ {
+ get
+ {
+ return ContentHeaderProperties.CorrelationId;
+ }
+ set
+ {
+ ContentHeaderProperties.ContentType = value;
+ }
+ }
+
+ struct Dest
+ {
+ public string ExchangeName;
+ public string RoutingKey;
+
+ public Dest(string exchangeName, string routingKey)
+ {
+ ExchangeName = exchangeName;
+ RoutingKey = routingKey;
+ }
+ }
+
+ public string ReplyToExchangeName
+ {
+ get
+ {
+ Dest dest = ReadReplyToHeader();
+ return dest.ExchangeName;
+ }
+ set
+ {
+ Dest dest = ReadReplyToHeader();
+ dest.ExchangeName = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+
+ public string ReplyToRoutingKey
+ {
+ get
+ {
+ Dest dest = ReadReplyToHeader();
+ return dest.RoutingKey;
+ }
+ set
+ {
+ Dest dest = ReadReplyToHeader();
+ dest.RoutingKey = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+
+ private Dest ReadReplyToHeader()
+ {
+ string replyToEncoding = ContentHeaderProperties.ReplyTo;
+ if (replyToEncoding == null)
+ {
+ return new Dest();
+ }
+ else
+ {
+ string routingKey;
+ string exchangeName = GetExchangeName(replyToEncoding, out routingKey);
+ return new Dest(exchangeName, routingKey);
+ }
+ }
+
+ private void WriteReplyToHeader(Dest dest)
+ {
+ string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+ ContentHeaderProperties.ReplyTo = encodedDestination;
+ }
+
+ private static string GetExchangeName(string replyToEncoding, out string routingKey)
+ {
+ string[] split = replyToEncoding.Split(new char[':']);
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug(string.Format("replyToEncoding = '{0}'", replyToEncoding));
+ _log.Debug(string.Format("split = {0}", split));
+ _log.Debug(string.Format("split.Length = {0}", split.Length));
+ }
+ if (split.Length == 1)
+ {
+ // Using an alternative split implementation here since it appears that string.Split
+ // is broken in .NET. It doesn't split when the first character is the delimiter.
+ // Here we check for the first character being the delimiter. This handles the case
+ // where ExchangeName is empty (i.e. sends will be to the default exchange).
+ if (replyToEncoding[0] == ':')
+ {
+ split = new string[2];
+ split[0] = null;
+ split[1] = replyToEncoding.Substring(1);
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Alternative split method...");
+ _log.Debug(string.Format("split = {0}", split));
+ _log.Debug(string.Format("split.Length = {0}", split.Length));
+ }
+ }
+ }
+ if (split.Length != 2)
+ {
+ throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+ }
+
+ string exchangeName = split[0];
+ routingKey = split[1];
+ return exchangeName;
+ }
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ byte b = ContentHeaderProperties.DeliveryMode;
+ switch (b)
+ {
+ case 1:
+ return DeliveryMode.NonPersistent;
+ case 2:
+ return DeliveryMode.Persistent;
+ default:
+ throw new QpidException("Illegal value for delivery mode in content header properties");
+ }
+ }
+ set
+ {
+ ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
+ }
+ }
+
+ public bool Redelivered
+ {
+ get
+ {
+ return _redelivered;
+ }
+ set
+ {
+ _redelivered = value;
+ }
+ }
+
+ public string Type
+ {
+ get
+ {
+ return MimeType;
+ }
+ set
+ {
+ //MimeType = value;
+ }
+ }
+
+ public long Expiration
+ {
+ get
+ {
+ return ContentHeaderProperties.Expiration;
+ }
+ set
+ {
+ ContentHeaderProperties.Expiration = (uint) value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ return ContentHeaderProperties.Priority;
+ }
+ set
+ {
+ ContentHeaderProperties.Priority = (byte) value;
+ }
+ }
+
+ // FIXME: implement
+ public string ContentType
+ {
+ get { throw new NotImplementedException(); }
+ set { throw new NotImplementedException(); }
+ }
+
+ // FIXME: implement
+ public string ContentEncoding
+ {
+ get { throw new NotImplementedException(); }
+ set { throw new NotImplementedException(); }
+ }
+
+ public void Acknowledge()
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_channel != null)
+ {
+ _channel.SendAcknowledgement(_messageNbr);
+ }
+ }
+
+ public IHeaders Headers
+ {
+ get { return new QpidHeaders(this); }
+ }
+
+ public abstract void ClearBody();
+
+ /// <summary>
+ /// Get a String representation of the body of the message. Used in the
+ /// toString() method which outputs this before message properties.
+ /// </summary>
+ /// <exception cref="QpidException"></exception>
+ public abstract string ToBodyString();
+
+ /// <summary>
+ /// Return the raw byte array that is used to populate the frame when sending
+ /// the message.
+ /// </summary>
+ /// <value>a byte array of message data</value>
+ public abstract byte[] Data
+ {
+ get;
+ set;
+ }
+
+ public abstract string MimeType
+ {
+ get;
+ }
+
+ public override string ToString()
+ {
+ try
+ {
+ StringBuilder buf = new StringBuilder("Body:\n");
+ buf.Append(ToBodyString());
+ buf.Append("\nQmsTimestamp: ").Append(Timestamp);
+ buf.Append("\nQmsExpiration: ").Append(Expiration);
+ buf.Append("\nQmsPriority: ").Append(Priority);
+ buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
+ buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
+ buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
+ buf.Append("\nAMQ message number: ").Append(_messageNbr);
+ buf.Append("\nProperties:");
+ if (ContentHeaderProperties.Headers == null)
+ {
+ buf.Append("<NONE>");
+ }
+ else
+ {
+ buf.Append(Headers.ToString());
+ }
+ return buf.ToString();
+ }
+ catch (Exception e)
+ {
+ return e.ToString();
+ }
+ }
+
+ public IFieldTable UnderlyingMessagePropertiesMap
+ {
+ get
+ {
+ return ContentHeaderProperties.Headers;
+ }
+ set
+ {
+ ContentHeaderProperties.Headers = (FieldTable)value;
+ }
+ }
+
+ public FieldTable PopulateHeadersFromMessageProperties()
+ {
+ if (ContentHeaderProperties.Headers == null)
+ {
+ return null;
+ }
+ else
+ {
+ //
+ // We need to convert every property into a String representation
+ // Note that type information is preserved in the property name
+ //
+ FieldTable table = new FieldTable();
+ foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
+ {
+ string propertyName = (string) entry.Key;
+ if (propertyName == null)
+ {
+ continue;
+ }
+ else
+ {
+ table[propertyName] = entry.Value.ToString();
+ }
+ }
+ return table;
+ }
+ }
+
+ /// <summary>
+ /// Get the AMQ message number assigned to this message
+ /// </summary>
+ /// <returns>the message number</returns>
+ public ulong MessageNbr
+ {
+ get
+ {
+ return _messageNbr;
+ }
+ set
+ {
+ _messageNbr = value;
+ }
+ }
+
+ public BasicContentHeaderProperties ContentHeaderProperties
+ {
+ get
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+ }
+ }
+
+ internal class QpidHeaders : IHeaders
+ {
+ public const char BOOLEAN_PROPERTY_PREFIX = 'B';
+ public const char BYTE_PROPERTY_PREFIX = 'b';
+ public const char SHORT_PROPERTY_PREFIX = 's';
+ public const char INT_PROPERTY_PREFIX = 'i';
+ public const char LONG_PROPERTY_PREFIX = 'l';
+ public const char FLOAT_PROPERTY_PREFIX = 'f';
+ public const char DOUBLE_PROPERTY_PREFIX = 'd';
+ public const char STRING_PROPERTY_PREFIX = 'S';
+
+ AbstractQmsMessage _message;
+
+ public QpidHeaders(AbstractQmsMessage message)
+ {
+ _message = message;
+ }
+
+ public bool Contains(string name)
+ {
+ CheckPropertyName(name);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return false;
+ }
+ else
+ {
+ // TODO: fix this
+ return _message.ContentHeaderProperties.Headers.Contains(STRING_PROPERTY_PREFIX + name);
+ }
+ }
+
+ public void Clear()
+ {
+ if (_message.ContentHeaderProperties.Headers != null)
+ {
+ _message.ContentHeaderProperties.Headers.Clear();
+ }
+ }
+
+ public string this[string name]
+ {
+ get
+ {
+ return GetString(name);
+ }
+ set
+ {
+ SetString(name, value);
+ }
+ }
+
+ public bool GetBoolean(string name)
+ {
+ CheckPropertyName(name);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return false;
+ }
+ else
+ {
+ object b = _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name];
+
+ if (b == null)
+ {
+ return false;
+ }
+ else
+ {
+ return (bool)b;
+ }
+ }
+ }
+
+ public void SetBoolean(string name, bool b)
+ {
+ CheckPropertyName(name);
+ _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name] = b;
+ }
+
+ public byte GetByte(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object b = _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName];
+ if (b == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (byte)b;
+ }
+ }
+ }
+
+ public void SetByte(string propertyName, byte b)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName] = b;
+ }
+
+ public short GetShort(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object s = _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName];
+ if (s == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (short)s;
+ }
+ }
+ }
+
+ public void SetShort(string propertyName, short i)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName] = i;
+ }
+
+ public int GetInt(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object i = _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName];
+ if (i == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (int)i;
+ }
+ }
+ }
+
+ public void SetInt(string propertyName, int i)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName] = i;
+ }
+
+ public long GetLong(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object l = _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName];
+ if (l == null)
+ {
+ // temp - the spec says do this but this throws a NumberFormatException
+ //return Long.valueOf(null).longValue();
+ return 0;
+ }
+ else
+ {
+ return (long)l;
+ }
+ }
+ }
+
+ public void SetLong(string propertyName, long l)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName] = l;
+ }
+
+ public float GetFloat(String propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object f = _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName];
+ if (f == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (float)f;
+ }
+ }
+ }
+
+ public void SetFloat(string propertyName, float f)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName] = f;
+ }
+
+ public double GetDouble(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object d = _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName];
+ if (d == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (double)d;
+ }
+ }
+ }
+
+ public void SetDouble(string propertyName, double v)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName] = v;
+ }
+
+ public string GetString(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return null;
+ }
+ else
+ {
+ return (string)_message.ContentHeaderProperties.Headers[STRING_PROPERTY_PREFIX + propertyName];
+ }
+ }
+
+ public void SetString(string propertyName, string value)
+ {
+ CheckPropertyName(propertyName);
+ CreatePropertyMapIfRequired();
+ propertyName = STRING_PROPERTY_PREFIX + propertyName;
+ _message.ContentHeaderProperties.Headers[propertyName] = value;
+ }
+
+ private void CheckPropertyName(string propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new ArgumentException("Property name must not be null");
+ }
+ else if ("".Equals(propertyName))
+ {
+ throw new ArgumentException("Property name must not be the empty string");
+ }
+
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ _message.ContentHeaderProperties.Headers = new FieldTable();
+ }
+ }
+
+ private void CreatePropertyMapIfRequired()
+ {
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ _message.ContentHeaderProperties.Headers = new FieldTable();
+ }
+ }
+
+ public override string ToString()
+ {
+ StringBuilder buf = new StringBuilder("{");
+ int i = 0;
+ foreach (DictionaryEntry entry in _message.ContentHeaderProperties.Headers)
+ {
+ ++i;
+ if (i > 1)
+ {
+ buf.Append(", ");
+ }
+ string propertyName = (string)entry.Key;
+ if (propertyName == null)
+ {
+ buf.Append("\nInternal error: Property with NULL key defined");
+ }
+ else
+ {
+ buf.Append(propertyName.Substring(1));
+
+ buf.Append(" : ");
+
+ char typeIdentifier = propertyName[0];
+ buf.Append(typeIdentifierToName(typeIdentifier));
+ buf.Append(" = ").Append(entry.Value);
+ }
+ }
+ buf.Append("}");
+ return buf.ToString();
+ }
+
+ private static string typeIdentifierToName(char typeIdentifier)
+ {
+ switch (typeIdentifier)
+ {
+ case BOOLEAN_PROPERTY_PREFIX:
+ return "boolean";
+ case BYTE_PROPERTY_PREFIX:
+ return "byte";
+ case SHORT_PROPERTY_PREFIX:
+ return "short";
+ case INT_PROPERTY_PREFIX:
+ return "int";
+ case LONG_PROPERTY_PREFIX:
+ return "long";
+ case FLOAT_PROPERTY_PREFIX:
+ return "float";
+ case DOUBLE_PROPERTY_PREFIX:
+ return "double";
+ case STRING_PROPERTY_PREFIX:
+ return "string";
+ default:
+ return "unknown ( '" + typeIdentifier + "')";
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
new file mode 100644
index 0000000000..2e71bfc948
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public interface IMessageFactory
+ {
+ /// <summary>
+ /// Create a message
+ /// </summary>
+ /// <param name="messageNbr"></param>
+ /// <param name="redelivered"></param>
+ /// <param name="contentHeader"></param>
+ /// <param name="bodies"></param>
+ /// <returns></returns>
+ /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
+ AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies);
+
+ /// <summary>
+ /// Creates the message.
+ /// </summary>
+ /// <returns></returns>
+ /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
+ AbstractQmsMessage CreateMessage();
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
new file mode 100644
index 0000000000..3965d531bb
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class MessageFactoryRegistry
+ {
+ private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+
+ public void RegisterFactory(string mimeType, IMessageFactory mf)
+ {
+ if (mf == null)
+ {
+ throw new ArgumentNullException("Message factory");
+ }
+ if (mimeType == null)
+ {
+ throw new ArgumentNullException("mf");
+ }
+ _mimeToFactoryMap[mimeType] = mf;
+ }
+
+ public void DeregisterFactory(string mimeType)
+ {
+ _mimeToFactoryMap.Remove(mimeType);
+ }
+
+ /// <summary>
+ /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ /// concrete message type.
+ /// </summary>
+ /// <param name="messageNbr">the AMQ message id</param>
+ /// <param name="redelivered">true if redelivered</param>
+ /// <param name="contentHeader">the content header that was received</param>
+ /// <param name="bodies">a list of ContentBody instances</param>
+ /// <returns>the message.</returns>
+ /// <exception cref="AMQException"/>
+ /// <exception cref="QpidException"/>
+ public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.Properties;
+
+ if (properties.ContentType == null)
+ {
+ properties.ContentType = "";
+ }
+
+ IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType];
+ if (mf == null)
+ {
+ throw new AMQException("Unsupport MIME type of " + properties.ContentType);
+ }
+ else
+ {
+ return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
+ }
+ }
+
+ public AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ if (mimeType == null)
+ {
+ throw new ArgumentNullException("Mime type must not be null");
+ }
+ IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType];
+ if (mf == null)
+ {
+ throw new AMQException("Unsupport MIME type of " + mimeType);
+ }
+ else
+ {
+ return mf.CreateMessage();
+ }
+ }
+
+ /// <summary>
+ /// Construct a new registry with the default message factories registered
+ /// </summary>
+ /// <returns>a message factory registry</returns>
+ public static MessageFactoryRegistry NewDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
+ mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
+ mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
+ // TODO: use bytes message for default message factory
+ // MJA - just added this bit back in...
+ mf.RegisterFactory("", new QpidBytesMessageFactory());
+ return mf;
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
new file mode 100644
index 0000000000..b7911b44b9
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
@@ -0,0 +1,581 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Text;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
+ {
+ private const string MIME_TYPE = "application/octet-stream";
+
+ /// <summary>
+ /// The backingstore for the data
+ /// </summary>
+ private MemoryStream _dataStream;
+
+ private int _bodyLength;
+
+ private BinaryReader _reader;
+
+ private BinaryWriter _writer;
+
+ public QpidBytesMessage() : this(null)
+ {
+ }
+
+ /// <summary>
+ /// Construct a bytes message with existing data.
+ /// </summary>
+ /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in
+ /// write-only mode</param>
+ QpidBytesMessage(byte[] data) : base()
+ {
+ // superclass constructor has instantiated a content header at this point
+ ContentHeaderProperties.ContentType = MIME_TYPE;
+ if (data == null)
+ {
+ _dataStream = new MemoryStream();
+ _writer = new BinaryWriter(_dataStream);
+ }
+ else
+ {
+ _dataStream = new MemoryStream(data);
+ _bodyLength = data.Length;
+ _reader = new BinaryReader(_dataStream);
+ }
+ }
+
+ public QpidBytesMessage(ulong messageNbr, byte[] data, ContentHeaderBody contentHeader)
+ // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+ : base(messageNbr, (BasicContentHeaderProperties) contentHeader.Properties)
+ {
+ ContentHeaderProperties.ContentType = MIME_TYPE;
+ _dataStream = new MemoryStream(data);
+ _bodyLength = data.Length;
+ _reader = new BinaryReader(_dataStream);
+ }
+
+ public override void ClearBody()
+ {
+ if (_reader != null)
+ {
+ _reader.Close();
+ _reader = null;
+ }
+ _dataStream = new MemoryStream();
+ _bodyLength = 0;
+
+ _writer = new BinaryWriter(_dataStream);
+ }
+
+ public override string ToBodyString()
+ {
+ CheckReadable();
+ try
+ {
+ return GetText();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString());
+ }
+ }
+
+ private string GetText()
+ {
+ if (_dataStream != null)
+ {
+ // we cannot just read the underlying buffer since it may be larger than the amount of
+ // "filled" data. Length is not the same as Capacity.
+ byte[] data = new byte[_dataStream.Length];
+ _dataStream.Read(data, 0, (int)_dataStream.Length);
+ return Encoding.UTF8.GetString(data);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public override byte[] Data
+ {
+ get
+ {
+ if (_dataStream == null)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] data = new byte[_dataStream.Length];
+ _dataStream.Position = 0;
+ _dataStream.Read(data, 0, (int) _dataStream.Length);
+ return data;
+ }
+ }
+ set
+ {
+ throw new NotSupportedException("Cannot set data payload except during construction");
+ }
+ }
+
+ public override string MimeType
+ {
+ get
+ {
+ return MIME_TYPE;
+ }
+ }
+
+ public long BodyLength
+ {
+ get
+ {
+ CheckReadable();
+ return _bodyLength;
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <exception cref="MessageNotReadableException">if the message is in write mode</exception>
+ private void CheckReadable()
+ {
+ if (_reader == null)
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ private void CheckWritable()
+ {
+ if (_reader != null)
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+
+ public bool ReadBoolean()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadBoolean();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public byte ReadByte()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadByte();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public short ReadSignedByte()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadSByte();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public short ReadShort()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadInt16();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadChar();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public int ReadInt()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadInt32();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public long ReadLong()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadInt64();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public float ReadFloat()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadSingle();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadDouble();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public string ReadUTF()
+ {
+ CheckReadable();
+ try
+ {
+ byte[] data = _reader.ReadBytes((int)_dataStream.Length);
+ return Encoding.UTF8.GetString(data);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public int ReadBytes(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ throw new ArgumentNullException("bytes");
+ }
+ CheckReadable();
+ try
+ {
+ return _reader.Read(bytes, 0, bytes.Length);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public int ReadBytes(byte[] bytes, int count)
+ {
+ CheckReadable();
+ if (bytes == null)
+ {
+ throw new ArgumentNullException("bytes");
+ }
+ if (count < 0)
+ {
+ throw new ArgumentOutOfRangeException("count must be >= 0");
+ }
+ if (count > bytes.Length)
+ {
+ count = bytes.Length;
+ }
+
+ try
+ {
+ return _reader.Read(bytes, 0, count);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteBoolean(bool b)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(b);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteByte(byte b)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(b);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteShort(short i)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(i);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteChar(char c)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(c);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteSignedByte(short value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteDouble(double value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteFloat(float value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteInt(int value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteLong(long value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(int i)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(i);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(long l)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(l);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(float v)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(v);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(double v)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(v);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteUTF(string value)
+ {
+ CheckWritable();
+ try
+ {
+ byte[] encodedData = Encoding.UTF8.GetBytes(value);
+ _writer.Write(encodedData);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteBytes(byte[] bytes)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(bytes);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteBytes(byte[] bytes, int offset, int length)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(bytes, offset, length);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Reset()
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Close();
+ _writer = null;
+ _reader = new BinaryReader(_dataStream);
+ _bodyLength = (int) _dataStream.Length;
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
new file mode 100644
index 0000000000..3f2a6c531f
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class QpidBytesMessageFactory : AbstractQmsMessageFactory
+ {
+ protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ byte[] data;
+
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ data = ((ContentBody)bodies[0]).Payload;
+ }
+ else
+ {
+ data = new byte[(long)contentHeader.BodySize];
+ int currentPosition = 0;
+ foreach (ContentBody cb in bodies)
+ {
+ Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ currentPosition += cb.Payload.Length;
+ }
+ }
+
+ return new QpidBytesMessage(messageNbr, data, contentHeader);
+ }
+
+ public override AbstractQmsMessage CreateMessage()
+ {
+ return new QpidBytesMessage();
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
new file mode 100644
index 0000000000..4c16038d4b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class QpidTextMessage : AbstractQmsMessage, ITextMessage
+ {
+ private const string MIME_TYPE = "text/plain";
+
+ private byte[] _data;
+
+ private string _decodedValue;
+
+ public QpidTextMessage() : this(null, null)
+ {
+ }
+
+ public QpidTextMessage(byte[] data, String encoding) : base()
+ {
+ // the superclass has instantied a content header at this point
+ ContentHeaderProperties.ContentType= MIME_TYPE;
+ _data = data;
+ ContentHeaderProperties.Encoding = encoding;
+ }
+
+ public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader)
+ : base(messageNbr, contentHeader)
+ {
+ contentHeader.ContentType = MIME_TYPE;
+ _data = data;
+ }
+
+ public QpidTextMessage(byte[] data) : this(data, null)
+ {
+ }
+
+ public QpidTextMessage(string text)
+ {
+ Text = text;
+ }
+
+ public override void ClearBody()
+ {
+ _data = null;
+ _decodedValue = null;
+ }
+
+ public override string ToBodyString()
+ {
+ return Text;
+ }
+
+ public override byte[] Data
+ {
+ get
+ {
+ return _data;
+ }
+ set
+ {
+ _data = value;
+ }
+ }
+
+ public override string MimeType
+ {
+ get
+ {
+ return MIME_TYPE;
+ }
+ }
+
+ public string Text
+ {
+ get
+ {
+ if (_data == null && _decodedValue == null)
+ {
+ return null;
+ }
+ else if (_decodedValue != null)
+ {
+ return _decodedValue;
+ }
+ else
+ {
+ if (ContentHeaderProperties.Encoding != null)
+ {
+ // throw ArgumentException if the encoding is not supported
+ _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data);
+ }
+ else
+ {
+ _decodedValue = Encoding.Default.GetString(_data);
+ }
+ return _decodedValue;
+ }
+ }
+
+ set
+ {
+ if (ContentHeaderProperties.Encoding == null)
+ {
+ _data = Encoding.Default.GetBytes(value);
+ }
+ else
+ {
+ // throw ArgumentException if the encoding is not supported
+ _data = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
+ }
+ _decodedValue = value;
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
new file mode 100644
index 0000000000..5457b2301e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class QpidTextMessageFactory : AbstractQmsMessageFactory
+ {
+ protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ byte[] data;
+
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ data = ((ContentBody)bodies[0]).Payload;
+ }
+ else
+ {
+ data = new byte[(int)contentHeader.BodySize];
+ int currentPosition = 0;
+ foreach (ContentBody cb in bodies)
+ {
+ Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ currentPosition += cb.Payload.Length;
+ }
+ }
+
+ return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties);
+ }
+
+
+ public override AbstractQmsMessage CreateMessage()
+ {
+ return new QpidTextMessage();
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs
new file mode 100644
index 0000000000..679114d105
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+
+namespace Qpid.Client.Message
+{
+ /// <summary>
+ /// Raised when a message body is received unexpectedly by the client. This typically occurs when the
+ /// length of bodies received does not match with the declared length in the content header.
+ /// </summary>
+ public class UnexpectedBodyReceivedException : AMQException
+ {
+ public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t)
+ : base(logger, msg, t)
+ {
+ }
+
+ public UnexpectedBodyReceivedException(ILog logger, string msg)
+ : base(logger, msg)
+ {
+ }
+
+ public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg)
+ : base(logger, errorCode, msg)
+ {
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
new file mode 100644
index 0000000000..cb4e64718b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class UnprocessedMessage
+ {
+ private ulong _bytesReceived = 0;
+
+ public BasicDeliverBody DeliverBody;
+ public BasicReturnBody BounceBody;
+ public ushort ChannelId;
+ public ContentHeaderBody ContentHeader;
+
+ /// <summary>
+ /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
+ /// </summary>
+ /// TODO: write and use linked list class
+ public IList Bodies = new ArrayList();
+
+ public void ReceiveBody(ContentBody body)
+ {
+ Bodies.Add(body);
+ if (body.Payload != null)
+ {
+ _bytesReceived += (uint)body.Payload.Length;
+ }
+ }
+
+ public bool IsAllBodyDataReceived()
+ {
+ return _bytesReceived == ContentHeader.BodySize;
+ }
+ }
+}
+