summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageProducer.cs')
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageProducer.cs578
1 files changed, 341 insertions, 237 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
index 759ffd62e3..fd430694df 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -21,255 +21,359 @@
using System;
using System.Threading;
using log4net;
+using Qpid.Buffer;
using Qpid.Client.Message;
using Qpid.Messaging;
+using Qpid.Framing;
namespace Qpid.Client
{
- public class BasicMessageProducer : Closeable, IMessagePublisher
- {
- protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
- /// <summary>
- /// If true, messages will not get a timestamp.
- /// </summary>
- private bool _disableTimestamps;
-
- /// <summary>
- /// Priority of messages created by this producer.
- /// </summary>
- private int _messagePriority;
-
- /// <summary>
- /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- ///
- private long _timeToLive;
-
- /// <summary>
- /// Delivery mode used for this producer.
- /// </summary>
- private DeliveryMode _deliveryMode;
-
- private bool _immediate;
- private bool _mandatory;
-
- string _exchangeName;
- string _routingKey;
-
- /// <summary>
- /// Default encoding used for messages produced by this producer.
- /// </summary>
- private string _encoding;
-
- /// <summary>
- /// Default encoding used for message produced by this producer.
- /// </summary>
- private string _mimeType;
-
- /// <summary>
- /// True if this producer was created from a transacted session
- /// </summary>
- private bool _transacted;
-
- private ushort _channelId;
-
- /// <summary>
- /// This is an id generated by the session and is used to tie individual producers to the session. This means we
- /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
- /// to the session so that when an error is propagated to the session it can close the producer (meaning that
- /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
- /// </summary>
- private long _producerId;
-
- /// <summary>
- /// The session used to create this producer
- /// </summary>
- private AmqChannel _channel;
-
- /// <summary>
- /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
- /// </summary>
- protected const bool DEFAULT_IMMEDIATE = false;
-
- /// <summary>
- /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
- /// connected to the exchange for the message
- /// </summary>
- protected const bool DEFAULT_MANDATORY = true;
-
- public BasicMessageProducer(string exchangeName, string routingKey,
- bool transacted,
- ushort channelId,
- AmqChannel channel,
- long producerId,
- DeliveryMode deliveryMode,
- long timeToLive,
- bool immediate,
- bool mandatory,
- int priority)
- {
- _exchangeName = exchangeName;
- _routingKey = routingKey;
- _transacted = transacted;
- _channelId = channelId;
- _channel = channel;
- _producerId = producerId;
- _deliveryMode = deliveryMode;
- _timeToLive = timeToLive;
- _immediate = immediate;
- _mandatory = mandatory;
- _messagePriority = priority;
-
- _channel.RegisterProducer(producerId, this);
- }
-
-
- #region IMessagePublisher Members
-
- public DeliveryMode DeliveryMode
- {
- get
- {
- CheckNotClosed();
- return _deliveryMode;
- }
- set
- {
- CheckNotClosed();
- _deliveryMode = value;
- }
- }
-
- public string ExchangeName
- {
- get { return _exchangeName; }
- }
-
- public string RoutingKey
- {
- get { return _routingKey; }
- }
-
- public bool DisableMessageID
- {
- get
- {
- throw new Exception("The method or operation is not implemented.");
- }
- set
- {
- throw new Exception("The method or operation is not implemented.");
- }
- }
-
- public bool DisableMessageTimestamp
- {
- get
- {
- CheckNotClosed();
- return _disableTimestamps;
- }
- set
- {
- CheckNotClosed();
- _disableTimestamps = value;
- }
- }
-
- public int Priority
- {
- get
- {
- CheckNotClosed();
- return _messagePriority;
- }
- set
- {
- CheckNotClosed();
- if (value < 0 || value > 9)
- {
- throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = value;
- }
- }
-
- public override void Close()
- {
- _logger.Debug("Closing producer " + this);
- Interlocked.Exchange(ref _closed, CLOSED);
- _channel.DeregisterProducer(_producerId);
- }
-
- public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
- {
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ ///
+ private long _timeToLive;
+
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+
+ private bool _immediate;
+ private bool _mandatory;
+
+ string _exchangeName;
+ string _routingKey;
+
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+
+ private ushort _channelId;
+
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+
+ /// <summary>
+ /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
+ /// </summary>
+ protected const bool DEFAULT_IMMEDIATE = false;
+
+ /// <summary>
+ /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
+ /// connected to the exchange for the message
+ /// </summary>
+ protected const bool DEFAULT_MANDATORY = true;
+
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+
+ _channel.RegisterProducer(producerId, this);
+ }
+
+
+ #region IMessagePublisher Members
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
- DEFAULT_IMMEDIATE);
- }
-
- public void Send(IMessage msg)
- {
+ return _deliveryMode;
+ }
+ set
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
- DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
- }
-
- // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
- // to facilitate publishing messages to potentially non-existent recipients.
- public void Send(IMessage msg, bool mandatory)
- {
+ _deliveryMode = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
- mandatory, DEFAULT_IMMEDIATE);
- }
-
- public long TimeToLive
- {
- get
- {
- CheckNotClosed();
- return _timeToLive;
- }
- set
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 || value > 9 )
{
- CheckNotClosed();
- if (value < 0)
- {
- throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
- }
- _timeToLive = value;
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
}
- }
-
- #endregion
-
- private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
- {
- _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps);
- }
-
- public string MimeType
- {
- set
+ _messagePriority = value;
+ }
+ }
+
+ public override void Close()
+ {
+ _logger.Debug("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
+ DEFAULT_IMMEDIATE);
+ }
+
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ }
+
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ mandatory, DEFAULT_IMMEDIATE);
+ }
+
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 )
{
- CheckNotClosed();
- _mimeType = value;
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
}
- }
+ _timeToLive = value;
+ }
+ }
- public string Encoding
- {
- set
- {
- CheckNotClosed();
- _encoding = value;
- }
- }
+ #endregion
- public void Dispose()
- {
- Close();
- }
- }
+ public string MimeType
+ {
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+
+ public string Encoding
+ {
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #region Message Publishing
+
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ // todo: handle session access ticket
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+ _channel.ChannelId, 0, exchangeName,
+ routingKey, mandatory, immediate
+ );
+
+ // fix message properties
+ if ( !_disableTimestamps )
+ {
+ message.Timestamp = DateTime.UtcNow.Ticks;
+ message.Expiration = message.Timestamp + timeToLive;
+ } else
+ {
+ message.Expiration = 0;
+ }
+ message.DeliveryMode = deliveryMode;
+ message.Priority = (byte)priority;
+
+ ByteBuffer payload = message.Data;
+ int payloadLength = payload.Limit;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for ( int i = 0; i < contentBodies.Length; i++ )
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+ _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
+ message.ContentHeaderProperties, (uint)payloadLength
+ );
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ lock ( _channel.Connection.FailoverMutex )
+ {
+ _channel.Connection.ProtocolWriter.Write(compositeFrame);
+ }
+ }
+
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(ByteBuffer payload)
+ {
+ if ( payload == null )
+ {
+ return null;
+ } else if ( payload.Remaining == 0 )
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+ int frameCount = CalculateContentBodyFrames(payload);
+ ContentBody[] bodies = new ContentBody[frameCount];
+ for ( int i = 0; i < frameCount; i++ )
+ {
+ int length = (payload.Remaining >= framePayloadMax)
+ ? framePayloadMax : payload.Remaining;
+ bodies[i] = new ContentBody(payload, (uint)length);
+ }
+ return bodies;
+ }
+
+ private int CalculateContentBodyFrames(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account
+ // for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ( (payload == null) || (payload.Remaining == 0) )
+ {
+ frameCount = 0;
+ } else
+ {
+ int dataLength = payload.Remaining;
+ int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+ #endregion // Message Publishing
+ }
}