diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageProducer.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageProducer.cs | 405 |
1 files changed, 0 insertions, 405 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs deleted file mode 100644 index f33afc452e..0000000000 --- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs +++ /dev/null @@ -1,405 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; -using log4net; -using Apache.Qpid.Buffer; -using Apache.Qpid.Client.Message; -using Apache.Qpid.Messaging; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client -{ - public class BasicMessageProducer : Closeable, IMessagePublisher - { - protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); - - /// <summary> - /// If true, messages will not get a timestamp. - /// </summary> - private bool _disableTimestamps; - - /// <summary> - /// Priority of messages created by this producer. - /// </summary> - private int _messagePriority; - - /// <summary> - /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. - /// </summary> - private long _timeToLive; - - /// <summary> - /// Delivery mode used for this producer. - /// </summary> - private DeliveryMode _deliveryMode; - - private bool _immediate; - private bool _mandatory; - - string _exchangeName; - string _routingKey; - - /// <summary> - /// Default encoding used for messages produced by this producer. - /// </summary> - private string _encoding; - - /// <summary> - /// Default encoding used for message produced by this producer. - /// </summary> - private string _mimeType; - - /// <summary> - /// True if this producer was created from a transacted session - /// </summary> - private bool _transacted; - - private ushort _channelId; - - /// <summary> - /// This is an id generated by the session and is used to tie individual producers to the session. This means we - /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers - /// to the session so that when an error is propagated to the session it can close the producer (meaning that - /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). - /// </summary> - private long _producerId; - - /// <summary> - /// The session used to create this producer - /// </summary> - private AmqChannel _channel; - - public BasicMessageProducer(string exchangeName, string routingKey, - bool transacted, - ushort channelId, - AmqChannel channel, - long producerId, - DeliveryMode deliveryMode, - long timeToLive, - bool immediate, - bool mandatory, - int priority) - { - _exchangeName = exchangeName; - _routingKey = routingKey; - _transacted = transacted; - _channelId = channelId; - _channel = channel; - _producerId = producerId; - _deliveryMode = deliveryMode; - _timeToLive = timeToLive; - _immediate = immediate; - _mandatory = mandatory; - _messagePriority = priority; - - _channel.RegisterProducer(producerId, this); - } - - - #region IMessagePublisher Members - - public DeliveryMode DeliveryMode - { - get - { - CheckNotClosed(); - return _deliveryMode; - } - set - { - CheckNotClosed(); - _deliveryMode = value; - } - } - - public string ExchangeName - { - get { return _exchangeName; } - } - - public string RoutingKey - { - get { return _routingKey; } - } - - public bool DisableMessageID - { - get - { - throw new Exception("The method or operation is not implemented."); - } - set - { - throw new Exception("The method or operation is not implemented."); - } - } - - public bool DisableMessageTimestamp - { - get - { - CheckNotClosed(); - return _disableTimestamps; - } - set - { - CheckNotClosed(); - _disableTimestamps = value; - } - } - - public int Priority - { - get - { - CheckNotClosed(); - return _messagePriority; - } - set - { - CheckNotClosed(); - if ( value < 0 || value > 9 ) - { - throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); - } - _messagePriority = value; - } - } - - public override void Close() - { - _logger.Debug("Closing producer " + this); - Interlocked.Exchange(ref _closed, CLOSED); - _channel.DeregisterProducer(_producerId); - } - - public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) - { - CheckNotClosed(); - SendImpl( - _exchangeName, - _routingKey, - (AbstractQmsMessage)msg, - deliveryMode, - priority, - (uint)timeToLive, - _mandatory, - _immediate - ); - } - - public void Send(IMessage msg) - { - CheckNotClosed(); - SendImpl( - _exchangeName, - _routingKey, - (AbstractQmsMessage)msg, - _deliveryMode, - _messagePriority, - (uint)_timeToLive, - _mandatory, - _immediate - ); - } - - // This is a short-term hack (knowing that this code will be re-vamped sometime soon) - // to facilitate publishing messages to potentially non-existent recipients. - public void Send(IMessage msg, bool mandatory) - { - CheckNotClosed(); - SendImpl( - _exchangeName, - _routingKey, - (AbstractQmsMessage)msg, - _deliveryMode, - _messagePriority, - (uint)_timeToLive, - mandatory, - _immediate - ); - } - - public long TimeToLive - { - get - { - CheckNotClosed(); - return _timeToLive; - } - set - { - CheckNotClosed(); - if ( value < 0 ) - { - throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); - } - _timeToLive = value; - } - } - - #endregion - - public string MimeType - { - get - { - CheckNotClosed(); - return _mimeType; - } - set - { - CheckNotClosed(); - _mimeType = value; - } - } - - public string Encoding - { - get - { - CheckNotClosed(); - return _encoding; - } - set - { - CheckNotClosed(); - _encoding = value; - } - } - - public void Dispose() - { - Close(); - } - - #region Message Publishing - - private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) - { - // todo: handle session access ticket - AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame( - _channel.ChannelId, 0, exchangeName, - routingKey, mandatory, immediate - ); - - // fix message properties - if ( !_disableTimestamps ) - { - message.Timestamp = DateTime.UtcNow.Ticks; - if (timeToLive != 0) - { - message.Expiration = message.Timestamp + timeToLive; - } - } else - { - message.Expiration = 0; - } - message.DeliveryMode = deliveryMode; - message.Priority = (byte)priority; - - ByteBuffer payload = message.Data; - int payloadLength = payload.Limit; - - ContentBody[] contentBodies = CreateContentBodies(payload); - AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; - for ( int i = 0; i < contentBodies.Length; i++ ) - { - frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); - } - if ( contentBodies.Length > 0 && _logger.IsDebugEnabled ) - { - _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); - } - - // weight argument of zero indicates no child content headers, just bodies - AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame( - _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, - message.ContentHeaderProperties, (uint)payloadLength - ); - if ( _logger.IsDebugEnabled ) - { - _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - - lock ( _channel.Connection.FailoverMutex ) - { - _channel.Connection.ProtocolWriter.Write(compositeFrame); - } - } - - - /// <summary> - /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated - /// maximum frame size. - /// </summary> - /// <param name="payload"></param> - /// <returns>return the array of content bodies</returns> - private ContentBody[] CreateContentBodies(ByteBuffer payload) - { - if ( payload == null ) - { - return null; - } else if ( payload.Remaining == 0 ) - { - return new ContentBody[0]; - } - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1); - int frameCount = CalculateContentBodyFrames(payload); - ContentBody[] bodies = new ContentBody[frameCount]; - for ( int i = 0; i < frameCount; i++ ) - { - int length = (payload.Remaining >= framePayloadMax) - ? framePayloadMax : payload.Remaining; - bodies[i] = new ContentBody(payload, (uint)length); - } - return bodies; - } - - private int CalculateContentBodyFrames(ByteBuffer payload) - { - // we substract one from the total frame maximum size to account - // for the end of frame marker in a body frame - // (0xCE byte). - int frameCount; - if ( (payload == null) || (payload.Remaining == 0) ) - { - frameCount = 0; - } else - { - int dataLength = payload.Remaining; - int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1; - int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; - frameCount = (int)(dataLength / framePayloadMax) + lastFrame; - } - - return frameCount; - } - #endregion // Message Publishing - } -} |