diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /dotnet/Qpid.Client/Client/AmqChannel.cs | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-f83677056891e436bf5ba99e79240df2a44528cd.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 1241 |
1 files changed, 0 insertions, 1241 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs deleted file mode 100644 index 84c7c06fe1..0000000000 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ /dev/null @@ -1,1241 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using System.Text.RegularExpressions; -using System.Threading; -using log4net; -using Apache.Qpid.Buffer; -using Apache.Qpid.Client.Message; -using Apache.Qpid.Client.Util; -using Apache.Qpid.Collections; -using Apache.Qpid.Framing; -using Apache.Qpid.Messaging; -using Apache.Qpid.Protocol; - -namespace Apache.Qpid.Client -{ - /// <summary> - /// <p/><table id="crc"><caption>CRC Card</caption> - /// <tr><th> Responsibilities <th> Collaborations - /// <tr><td> Declare queues. - /// <tr><td> Declare exchanges. - /// <tr><td> Bind queues to exchanges. - /// <tr><td> Create messages. - /// <tr><td> Set up message consumers on the channel. - /// <tr><td> Set up message producers on the channel. - /// <tr><td> Commit the current transaction. - /// <tr><td> Roll-back the current transaction. - /// <tr><td> Close the channel. - /// </table> - /// </summary> - public class AmqChannel : Closeable, IChannel - { - private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); - - internal const int BASIC_CONTENT_TYPE = 60; - - public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; - - public const int DEFAULT_PREFETCH_LOW_MARK = 2500; - - private static int _nextSessionNumber = 0; - - private AMQConnection _connection; - - private int _sessionNumber; - - private bool _suspended; - - private object _suspensionLock = new object(); - - // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. - private int _nextConsumerNumber = 1; - - private bool _transacted; - - private AcknowledgeMode _acknowledgeMode; - - private ushort _channelId; - - private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; - - private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; - - private FlowControlQueue _queue; - - private Dispatcher _dispatcher; - - private MessageFactoryRegistry _messageFactoryRegistry; - - /// <summary> Holds all of the producers created by this channel. </summary> - private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); - - /// <summary> Holds all of the consumers created by this channel. </summary> - private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); - - private ArrayList _replayFrames = new ArrayList(); - - /// <summary> - /// The counter of the _next producer id. This id is generated by the session and used only to allow the - /// producer to identify itself to the session when deregistering itself. - /// - /// Access to this id does not require to be synchronized since according to the JMS specification only one - /// thread of control is allowed to create producers for any given session instance. - /// </summary> - private long _nextProducerId; - - /// <summary> - /// Initializes a new instance of the <see cref="AmqChannel"/> class. - /// </summary> - /// <param name="con">The connection.</param> - /// <param name="channelId">The channel id.</param> - /// <param name="transacted">if set to <c>true</c> [transacted].</param> - /// <param name="acknowledgeMode">The acknowledge mode.</param> - /// <param name="defaultPrefetchHigh">Default prefetch high value</param> - /// <param name="defaultPrefetchLow">Default prefetch low value</param> - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, - int defaultPrefetchHigh, int defaultPrefetchLow) - : this() - { - _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); - _connection = con; - _transacted = transacted; - - if ( transacted ) - { - _acknowledgeMode = AcknowledgeMode.SessionTransacted; - } - else - { - _acknowledgeMode = acknowledgeMode; - } - - _channelId = channelId; - _defaultPrefetchHighMark = defaultPrefetchHigh; - _defaultPrefetchLowMark = defaultPrefetchLow; - - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, - new ThresholdMethod(OnPrefetchLowMark), - new ThresholdMethod(OnPrefetchHighMark)); - } - else - { - // low and upper are the same - _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, - null, null); - } - } - - private AmqChannel() - { - _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); - } - - /// <summary> - /// Acknowledge mode for messages received. - /// </summary> - public AcknowledgeMode AcknowledgeMode - { - get - { - CheckNotClosed(); - return _acknowledgeMode; - } - } - - /// <summary> - /// True if the channel should use transactions. - /// </summary> - public bool Transacted - { - get - { - CheckNotClosed(); - return _transacted; - } - } - - /// <summary> - /// Prefetch value to be used as the default for - /// consumers created on this channel. - /// </summary> - public int DefaultPrefetch - { - get { return DefaultPrefetchHigh; } - } - - /// <summary> - /// Prefetch low value to be used as the default for - /// consumers created on this channel. - /// </summary> - public int DefaultPrefetchLow - { - get { return _defaultPrefetchLowMark; } - } - - /// <summary> - /// Prefetch high value to be used as the default for - /// consumers created on this channel. - /// </summary> - public int DefaultPrefetchHigh - { - get { return _defaultPrefetchHighMark; } - } - - /// <summary> Indicates whether or not this channel is currently suspended. </summary> - public bool IsSuspended - { - get { return _suspended; } - } - - /// <summary> Provides the channels number within the the connection. </summary> - public ushort ChannelId - { - get { return _channelId; } - } - - /// <summary> Provides the connection that this channel runs over. </summary> - public AMQConnection Connection - { - get { return _connection; } - } - - /// <summary> - /// Declare a new exchange. - /// </summary> - /// <param name="exchangeName">Name of the exchange</param> - /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param> - public void DeclareExchange(String exchangeName, String exchangeClass) - { - _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); - - DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); - } - - /// <summary> - /// Declare a new exchange using the default exchange class. - /// </summary> - /// <param name="exchangeName">Name of the exchange</param> - public void DeleteExchange(string exchangeName) - { - throw new NotImplementedException(); - } - - /// <summary> - /// Declare a new queue with the specified set of arguments. - /// </summary> - /// <param name="queueName">Name of the queue</param> - /// <param name="isDurable">True if the queue should be durable</param> - /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> - /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> - public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) - { - DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, null); - } - - /// <summary> - /// Declare a new queue with the specified set of arguments. - /// </summary> - /// <param name="queueName">Name of the queue</param> - /// <param name="isDurable">True if the queue should be durable</param> - /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> - /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> - /// <param name="args">Optional arguments to Queue.Declare</param> - public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) - { - DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, args); - } - - /// <summary> - /// Delete a queue with the specifies arguments. - /// </summary> - /// <param name="queueName">Name of the queue to delete</param> - /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param> - /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param> - /// <param name="noWait">If true, the server will not respond to the method</param> - public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) - { - DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); - } - - /// <summary> - /// Generate a new Unique name to use for a queue. - /// </summary> - /// <returns>A unique name to this channel</returns> - public string GenerateUniqueName() - { - string result = _connection.ProtocolSession.GenerateQueueName(); - return Regex.Replace(result, "[^a-z0-9_]", "_"); - } - - /// <summary> - /// Removes all messages from a queue. - /// </summary> - /// <param name="queueName">Name of the queue to delete</param> - /// <param name="noWait">If true, the server will not respond to the method</param> - public void PurgeQueue(string queueName, bool noWait) - { - DoPurgeQueue(queueName, noWait); - } - - /// <summary> - /// Bind a queue to the specified exchange. - /// </summary> - /// <param name="queueName">Name of queue to bind</param> - /// <param name="exchangeName">Name of exchange to bind to</param> - /// <param name="routingKey">Routing key</param> - public void Bind(string queueName, string exchangeName, string routingKey) - { - DoBind(queueName, exchangeName, routingKey, new FieldTable()); - } - - /// <summary> - /// Bind a queue to the specified exchange. - /// </summary> - /// <param name="queueName">Name of queue to bind</param> - /// <param name="exchangeName">Name of exchange to bind to</param> - /// <param name="routingKey">Routing key</param> - /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param> - public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) - { - DoBind(queueName, exchangeName, routingKey, (FieldTable)args); - } - - /// <summary> - /// Create a new empty message with no body. - /// </summary> - /// <returns>The new message</returns> - public IMessage CreateMessage() - { - return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); - } - - /// <summary> - /// Create a new message of the specified MIME type. - /// </summary> - /// <param name="mimeType">The mime type to create</param> - /// <returns>The new message</returns> - public IMessage CreateMessage(string mimeType) - { - return _messageFactoryRegistry.CreateMessage(mimeType); - } - - /// <summary> - /// Creates a new message for bytes (application/octet-stream). - /// </summary> - /// <returns>The new message</returns> - public IBytesMessage CreateBytesMessage() - { - return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); - } - - /// <summary> - /// Creates a new text message (text/plain) with empty content. - /// </summary> - /// <returns>The new message</returns> - public ITextMessage CreateTextMessage() - { - return CreateTextMessage(String.Empty); - } - - /// <summary> - /// Creates a new text message (text/plain) with a body. - /// </summary> - /// <param name="text">Initial body of the message</param> - /// <returns>The new message</returns> - public ITextMessage CreateTextMessage(string text) - { - ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); - msg.Text = text; - return msg; - } - - /// <summary> - /// Creates a new Consumer using the builder pattern. - /// </summary> - /// <param name="queueName">Name of queue to receive messages from</param> - /// <returns>The builder object</returns> - public MessageConsumerBuilder CreateConsumerBuilder(string queueName) - { - return new MessageConsumerBuilder(this, queueName); - } - - /// <summary> - /// Creates a new consumer. - /// </summary> - /// <param name="queueName">Name of queue to receive messages from</param> - /// <param name="prefetchLow">Low prefetch value</param> - /// <param name="prefetchHigh">High prefetch value</param> - /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> - /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> - /// <returns>The new consumer</returns> - public IMessageConsumer CreateConsumer(string queueName, - int prefetchLow, - int prefetchHigh, - bool noLocal, - bool exclusive) - { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", - queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); - - return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, false); - } - - /// <summary> - /// Creates a new consumer. - /// </summary> - /// <param name="queueName">Name of queue to receive messages from</param> - /// <param name="prefetchLow">Low prefetch value</param> - /// <param name="prefetchHigh">High prefetch value</param> - /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> - /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> - /// <param name="browse">If true, the consumer only browses and does not consume messages</param> - /// <returns>The new consumer</returns> - public IMessageConsumer CreateConsumer(string queueName, - int prefetchLow, - int prefetchHigh, - bool noLocal, - bool exclusive, - bool browse) - { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} browse={5}", - queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse)); - - return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse); - } - - - /// <summary> - /// Unsubscribe from a queue. - /// </summary> - /// <param name="subscriptionName">Subscription name</param> - public void Unsubscribe(String name) - { - throw new NotImplementedException(); - } - - /// <summary> - /// Create a new message publisher using the builder pattern. - /// </summary> - /// <returns>The builder object</returns> - public MessagePublisherBuilder CreatePublisherBuilder() - { - return new MessagePublisherBuilder(this); - } - - /// <summary> - /// Create a new message publisher. - /// </summary> - /// <param name="exchangeName">Name of exchange to publish to</param> - /// <param name="routingKey">Routing key</param> - /// <param name="deliveryMode">Default delivery mode</param> - /// <param name="timeToLive">Default TTL time of messages</param> - /// <param name="immediate">If true, sent immediately</param> - /// <param name="mandatory">If true, the broker will return an error - /// (as a connection exception) if the message cannot be delivered</param> - /// <param name="priority">Default message priority</param> - /// <returns>The new message publisher</returns> - public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, - long timeToLive, bool immediate, bool mandatory, int priority) - { - _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", - exchangeName, "none", routingKey)); - - return CreateProducerImpl(exchangeName, routingKey, deliveryMode, - timeToLive, immediate, mandatory, priority); - } - - /// <summary> - /// Recover after transaction failure. - /// </summary> - /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks> - public void Recover() - { - CheckNotClosed(); - CheckNotTransacted(); - - throw new NotImplementedException(); - } - - /// <summary> - /// Commit the transaction. - /// </summary> - public void Commit() - { - // FIXME: Fail over safety. Needs FailoverSupport? - CheckNotClosed(); - CheckTransacted(); // throws IllegalOperationException if not a transacted session - - try - { - // Acknowledge up to message last delivered (if any) for each consumer. - // Need to send ack for messages delivered to consumers so far. - foreach (BasicMessageConsumer consumer in _consumers.Values) - { - // Sends acknowledgement to server. - consumer.AcknowledgeDelivered(); - } - - // Commits outstanding messages sent and outstanding acknowledgements. - _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); - } - catch (AMQException e) - { - throw new QpidException("Failed to commit", e); - } - } - - /// <summary> - /// Rollback the transaction. - /// </summary> - public void Rollback() - { - lock (_suspensionLock) - { - CheckTransacted(); // throws IllegalOperationException if not a transacted session - - try - { - bool suspended = IsSuspended; - if (!suspended) - { - Suspend(true); - } - - // Reject up to message last delivered (if any) for each consumer. - // Need to send reject for messages delivered to consumers so far. - foreach (BasicMessageConsumer consumer in _consumers.Values) - { - // Sends acknowledgement to server. - consumer.RejectUnacked(); - } - - _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); - - if ( !suspended ) - { - Suspend(false); - } - } - catch (AMQException e) - { - throw new QpidException("Failed to rollback", e); - } - } - } - - /// <summary> - /// Create a disconnected channel that will fault - /// for most things, but is useful for testing - /// </summary> - /// <returns>A new disconnected channel</returns> - public static IChannel CreateDisconnectedChannel() - { - return new AmqChannel(); - } - - public override void Close() - { - lock (_connection.FailoverMutex) - { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session - - lock (_closingLock) - { - SetClosed(); - - // we pass null since this is not an error case - CloseProducersAndConsumers(null); - - try - { - _connection.CloseSession(this); - } - catch (AMQException e) - { - throw new QpidException("Error closing session: " + e); - } - finally - { - _connection.DeregisterSession(_channelId); - } - } - } - } - - /** - * Called when the server initiates the closure of the session - * unilaterally. - * @param e the exception that caused this session to be closed. Null causes the - */ - public void ClosedWithException(Exception e) - { - lock (_connection.FailoverMutex) - { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - SetClosed(); - AMQException amqe; - - if (e is AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.DeregisterSession(_channelId); - CloseProducersAndConsumers(amqe); - } - } - - public void MessageReceived(UnprocessedMessage message) - { - if (_logger.IsDebugEnabled) - { - _logger.Debug("Message received in session with channel id " + _channelId); - } - - if ( message.DeliverBody == null ) - { - ReturnBouncedMessage(message); - } - else - { - _queue.Enqueue(message); - } - } - - public void Dispose() - { - Close(); - } - - private void SetClosed() - { - Interlocked.Exchange(ref _closed, CLOSED); - } - - /// <summary> - /// Close all producers or consumers. This is called either in the error case or when closing the session normally. - /// <param name="amqe">the exception, may be null to indicate no error has occurred</param> - /// - private void CloseProducersAndConsumers(AMQException amqe) - { - try - { - CloseProducers(); - } - catch (QpidException e) - { - _logger.Error("Error closing session: " + e, e); - } - try - { - CloseConsumers(amqe); - } - catch (QpidException e) - { - _logger.Error("Error closing session: " + e, e); - } - } - - /// <summary> - /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is - /// currently no way of propagating errors to message producers (this is a JMS limitation). - /// </summary> - private void CloseProducers() - { - _logger.Debug("Closing producers on session " + this); - - // we need to clone the list of producers since the close() method updates the _producers collection - // which would result in a concurrent modification exception - ArrayList clonedProducers = new ArrayList(_producers.Values); - - foreach (BasicMessageProducer prod in clonedProducers) - { - _logger.Debug("Closing producer " + prod); - prod.Close(); - } - - // at this point the _producers map is empty - } - - /// <summary> - /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. - /// <param name="error">not null if this is a result of an error occurring at the connection level</param> - private void CloseConsumers(Exception error) - { - if (_dispatcher != null) - { - _dispatcher.StopDispatcher(); - } - - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - ArrayList clonedConsumers = new ArrayList(_consumers.Values); - - foreach (BasicMessageConsumer con in clonedConsumers) - { - if (error != null) - { - con.NotifyError(error); - } - else - { - con.Close(); - } - } - - // at this point the _consumers map will be empty - } - - private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, - DeliveryMode deliveryMode, - long timeToLive, bool immediate, bool mandatory, int priority) - { - lock (_closingLock) - { - CheckNotClosed(); - - try - { - return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, - this, GetNextProducerId(), - deliveryMode, timeToLive, immediate, mandatory, priority); - } - catch (AMQException e) - { - _logger.Error("Error creating message producer: " + e, e); - throw new QpidException("Error creating message producer", e); - } - } - } - - /// <summary> Creates a message consumer on this channel.</summary> - /// - /// <param name="queueName">The name of the queue to attach the consumer to.</param> - /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param> - /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param> - /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param> - /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param> - /// - /// <return>The message consumer.</return> - private IMessageConsumer CreateConsumerImpl(string queueName, - int prefetchLow, - int prefetchHigh, - bool noLocal, - bool exclusive, - bool browse) - { - lock (_closingLock) - { - CheckNotClosed(); - - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, - _messageFactoryRegistry, this, - prefetchHigh, prefetchLow, exclusive, - browse); - try - { - RegisterConsumer(consumer); - } - catch (AMQException e) - { - throw new QpidException("Error registering consumer: " + e, e); - } - - return consumer; - } - } - - private void CheckTransacted() - { - if (!Transacted) - { - throw new InvalidOperationException("Channel is not transacted"); - } - } - - private void CheckNotTransacted() - { - if (Transacted) - { - throw new InvalidOperationException("Channel is transacted"); - } - } - - internal void Start() - { - _dispatcher = new Dispatcher(this); - Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); - dispatcherThread.IsBackground = true; - dispatcherThread.Start(); - } - - internal void Stop() - { - Suspend(true); - if (_dispatcher != null) - { - _dispatcher.StopDispatcher(); - } - } - - internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) - { - _consumers[consumerTag] = consumer; - } - - /// <summary> - /// Called by the MessageConsumer when closing, to deregister the consumer from the - /// map from consumerTag to consumer instance. - /// </summary> - /// <param name="consumerTag">the consumer tag, that was broker-generated</param> - internal void DeregisterConsumer(string consumerTag) - { - _consumers.Remove(consumerTag); - } - - internal void RegisterProducer(long producerId, IMessagePublisher publisher) - { - _producers[producerId] = publisher; - } - - internal void DeregisterProducer(long producerId) - { - _producers.Remove(producerId); - } - - private long GetNextProducerId() - { - return ++_nextProducerId; - } - - /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after - * failover when the client has veoted resubscription. - * - * The caller of this method must already hold the failover mutex. - */ - internal void MarkClosed() - { - SetClosed(); - _connection.DeregisterSession(_channelId); - MarkClosedProducersAndConsumers(); - } - - private void MarkClosedProducersAndConsumers() - { - try - { - // no need for a markClosed* method in this case since there is no protocol traffic closing a producer - CloseProducers(); - } - catch (QpidException e) - { - _logger.Error("Error closing session: " + e, e); - } - try - { - MarkClosedConsumers(); - } - catch (QpidException e) - { - _logger.Error("Error closing session: " + e, e); - } - } - - private void MarkClosedConsumers() - { - if (_dispatcher != null) - { - _dispatcher.StopDispatcher(); - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - ArrayList clonedConsumers = new ArrayList(_consumers.Values); - - foreach (BasicMessageConsumer consumer in clonedConsumers) - { - consumer.MarkClosed(); - } - // at this point the _consumers map will be empty - } - - private void DoPurgeQueue(string queueName, bool noWait) - { - try - { - _logger.DebugFormat("PurgeQueue {0}", queueName); - - AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); - - if (noWait) - _connection.ProtocolWriter.Write(purgeQueue); - else - _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); - } - catch (AMQException) - { - throw; - } - } - - /** - * Replays frame on fail over. - * - * @throws AMQException - */ - internal void ReplayOnFailOver() - { - _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId)); - foreach (AMQFrame frame in _replayFrames) - { - _logger.Debug(string.Format("Replaying frame=[{0}]", frame)); - _connection.ProtocolWriter.Write(frame); - } - } - - /// <summary> - /// Callers must hold the failover mutex before calling this method. - /// </summary> - /// <param name="consumer"></param> - private void RegisterConsumer(BasicMessageConsumer consumer) - { - // Need to generate a consumer tag on the client so we can exploit the nowait flag. - String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); - consumer.ConsumerTag = tag; - _consumers.Add(tag, consumer); - - String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode, tag, consumer.Browse); - - } - - internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) - { - - _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", - queueName, exchangeName, routingKey, args)); - - AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, - queueName, exchangeName, - routingKey, false, args); - - - lock (_connection.FailoverMutex) - { - _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); - } - // AS FIXME: wasnae me - _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0, - queueName, exchangeName, - routingKey, true, args)); - } - - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag, bool browse) - { - FieldTable args = new FieldTable(); - if(browse) - { - args["x-filter-no-consume"] = true; - } - AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, - queueName, tag, noLocal, - acknowledgeMode == AcknowledgeMode.NoAcknowledge, - exclusive, true, args); - - _replayFrames.Add(basicConsume); - - _connection.ProtocolWriter.Write(basicConsume); - return tag; - } - - private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) - { - try - { - _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); - - AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); - - if (noWait) - { - _connection.ProtocolWriter.Write(queueDelete); - } - else - { - _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); - } - // AS FIXME: wasnae me - _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true)); - } - catch (AMQException) - { - throw; - } - } - - private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) - { - _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", - queueName, isDurable, isExclusive, isAutoDelete)); - - AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, false, (FieldTable) args); - - - lock (_connection.FailoverMutex) - { - _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); - } - // AS FIXME: wasnae me - _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, true, null)); - } - - // AMQP-level method. - private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, - string exchangeClass, bool passive, bool durable, - bool autoDelete, bool xinternal, bool noWait, FieldTable args) - { - _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", - _channelId, exchangeName, exchangeClass)); - - AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, - durable, autoDelete, xinternal, noWait, args); - - if (noWait) - { - lock (_connection.FailoverMutex) - { - _connection.ProtocolWriter.Write(declareExchange); - } - // AS FIXME: wasnae me - _replayFrames.Add(declareExchange); - } - else - { - throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); - //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); - } - } - - /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from - * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is - * AUTO_ACK or similar. - * - * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the - * delivery tag - */ - internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) - { - AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); - if (_logger.IsDebugEnabled) - { - _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); - } - // FIXME: lock FailoverMutex here? - _connection.ProtocolWriter.Write(ackFrame); - } - - public void RejectMessage(ulong deliveryTag, bool requeue) - { - if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted)) - { - AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue); - _connection.ProtocolWriter.Write(rejectFrame); - } - } - - /// <summary> - /// Handle a message that bounced from the server, creating - /// the corresponding exception and notifying the connection about it - /// </summary> - /// <param name="message">Unprocessed message</param> - private void ReturnBouncedMessage(UnprocessedMessage message) - { - try - { - AbstractQmsMessage bouncedMessage = - _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); - - int errorCode = message.BounceBody.ReplyCode; - string reason = message.BounceBody.ReplyText; - _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); - AMQException exception; - - if (errorCode == AMQConstant.NO_CONSUMERS.Code) - { - exception = new AMQNoConsumersException(reason, bouncedMessage); - } - else if (errorCode == AMQConstant.NO_ROUTE.Code) - { - exception = new AMQNoRouteException(reason, bouncedMessage); - } - else - { - exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); - } - - _connection.ExceptionReceived(exception); - } - catch (Exception ex) - { - _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); - } - } - - private void OnPrefetchLowMark(int count) - { - if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) - { - _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); - Suspend(false); - } - } - - private void OnPrefetchHighMark(int count) - { - if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) - { - _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); - Suspend(true); - } - } - - private void Suspend(bool suspend) - { - lock (_suspensionLock) - { - if (_logger.IsDebugEnabled) - { - _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - } - - _suspended = suspend; - AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); - - Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); - } - } - - /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. - /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message - /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new - /// message. - /// - /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be - /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a - /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. - /// - /// <p/><table id="crc"><caption>CRC Card</caption> - /// <tr><th> Responsibilities <th> Collaborations - /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/> - /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/> - /// </table> - /// </summary> - /// - /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. - /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on - /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks> - /// - /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should - /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks> - private class Dispatcher - { - /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary> - private int _stopped = 0; - - /// <summary> The channel for which this is a dispatcher. </summary> - private AmqChannel _containingChannel; - - /// <summary> Creates a dispatcher on the specified channel. </summary> - /// - /// <param name="containingChannel"> The channel on which this is a dispatcher. </param> - public Dispatcher(AmqChannel containingChannel) - { - _containingChannel = containingChannel; - } - - /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and - /// the connection of bounced messages.</summary> - public void RunDispatcher() - { - UnprocessedMessage message; - - while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) - { - if (message.DeliverBody != null) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; - - if (consumer == null) - { - _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); - } - else - { - consumer.NotifyMessage(message, _containingChannel.ChannelId); - } - } - else - { - try - { - // Bounced message is processed here, away from the transport thread - AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. - CreateMessage(0, false, message.ContentHeader, message.Bodies); - - int errorCode = message.BounceBody.ReplyCode; - string reason = message.BounceBody.ReplyText; - - _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } - catch (Exception e) - { - _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } - } - - _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); - } - - /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary> - public void StopDispatcher() - { - Interlocked.Exchange(ref _stopped, 1); - } - } - } -} |