diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
| commit | 7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch) | |
| tree | 3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/BasicMessageProducer.cs | |
| parent | 8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff) | |
| download | qpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz | |
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageProducer.cs')
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageProducer.cs | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs new file mode 100644 index 0000000000..a93d2db675 --- /dev/null +++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs @@ -0,0 +1,275 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Qpid.Client.Message; +using Qpid.Messaging; + +namespace Qpid.Client +{ + public class BasicMessageProducer : Closeable, IMessagePublisher + { + protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); + + /// <summary> + /// If true, messages will not get a timestamp. + /// </summary> + private bool _disableTimestamps; + + /// <summary> + /// Priority of messages created by this producer. + /// </summary> + private int _messagePriority; + + /// <summary> + /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + /// + private long _timeToLive; + + /// <summary> + /// Delivery mode used for this producer. + /// </summary> + private DeliveryMode _deliveryMode; + + private bool _immediate; + private bool _mandatory; + + string _exchangeName; + string _routingKey; + + /// <summary> + /// Default encoding used for messages produced by this producer. + /// </summary> + private string _encoding; + + /// <summary> + /// Default encoding used for message produced by this producer. + /// </summary> + private string _mimeType; + + /// <summary> + /// True if this producer was created from a transacted session + /// </summary> + private bool _transacted; + + private ushort _channelId; + + /// <summary> + /// This is an id generated by the session and is used to tie individual producers to the session. This means we + /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers + /// to the session so that when an error is propagated to the session it can close the producer (meaning that + /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + /// </summary> + private long _producerId; + + /// <summary> + /// The session used to create this producer + /// </summary> + private AmqChannel _channel; + + /// <summary> + /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue + /// </summary> + protected const bool DEFAULT_IMMEDIATE = false; + + /// <summary> + /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is + /// connected to the exchange for the message + /// </summary> + protected const bool DEFAULT_MANDATORY = true; + + public BasicMessageProducer(string exchangeName, string routingKey, + bool transacted, + ushort channelId, + AmqChannel channel, + long producerId, + DeliveryMode deliveryMode, + long timeToLive, + bool immediate, + bool mandatory, + int priority) + { + _exchangeName = exchangeName; + _routingKey = routingKey; + _transacted = transacted; + _channelId = channelId; + _channel = channel; + _producerId = producerId; + _deliveryMode = deliveryMode; + _timeToLive = timeToLive; + _immediate = immediate; + _mandatory = mandatory; + _messagePriority = priority; + + _channel.RegisterProducer(producerId, this); + } + + + #region IMessagePublisher Members + + public DeliveryMode DeliveryMode + { + get + { + CheckNotClosed(); + return _deliveryMode; + } + set + { + CheckNotClosed(); + _deliveryMode = value; + } + } + + public string ExchangeName + { + get { return _exchangeName; } + } + + public string RoutingKey + { + get { return _routingKey; } + } + + public bool DisableMessageID + { + get + { + throw new Exception("The method or operation is not implemented."); + } + set + { + throw new Exception("The method or operation is not implemented."); + } + } + + public bool DisableMessageTimestamp + { + get + { + CheckNotClosed(); + return _disableTimestamps; + } + set + { + CheckNotClosed(); + _disableTimestamps = value; + } + } + + public int Priority + { + get + { + CheckNotClosed(); + return _messagePriority; + } + set + { + CheckNotClosed(); + if (value < 0 || value > 9) + { + throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = value; + } + } + + public override void Close() + { + _logger.Info("Closing producer " + this); + Interlocked.Exchange(ref _closed, CLOSED); + _channel.DeregisterProducer(_producerId); + } + + public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY, + DEFAULT_IMMEDIATE); + } + + public void Send(IMessage msg) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, + DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + } + + // This is a short-term hack (knowing that this code will be re-vamped sometime soon) + // to facilitate publishing messages to potentially non-existent recipients. + public void Send(IMessage msg, bool mandatory) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, + mandatory, DEFAULT_IMMEDIATE); + } + + public long TimeToLive + { + get + { + CheckNotClosed(); + return _timeToLive; + } + set + { + CheckNotClosed(); + if (value < 0) + { + throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); + } + _timeToLive = value; + } + } + + #endregion + + private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) + { + _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps); + } + + public string MimeType + { + set + { + CheckNotClosed(); + _mimeType = value; + } + } + + public string Encoding + { + set + { + CheckNotClosed(); + _encoding = value; + } + } + + public void Dispose() + { + Close(); + } + } +} |
