diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 63 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 10 |
2 files changed, 62 insertions, 11 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 86dc9a4681..84c7c06fe1 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -251,7 +251,20 @@ namespace Apache.Qpid.Client /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) { - DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, null); + } + + /// <summary> + /// Declare a new queue with the specified set of arguments. + /// </summary> + /// <param name="queueName">Name of the queue</param> + /// <param name="isDurable">True if the queue should be durable</param> + /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> + /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> + /// <param name="args">Optional arguments to Queue.Declare</param> + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, args); } /// <summary> @@ -386,9 +399,33 @@ namespace Apache.Qpid.Client _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); - return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, false); + } + + /// <summary> + /// Creates a new consumer. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <param name="prefetchLow">Low prefetch value</param> + /// <param name="prefetchHigh">High prefetch value</param> + /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> + /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> + /// <param name="browse">If true, the consumer only browses and does not consume messages</param> + /// <returns>The new consumer</returns> + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool browse) + { + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} browse={5}", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse)); + + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse); } + /// <summary> /// Unsubscribe from a queue. /// </summary> @@ -712,7 +749,8 @@ namespace Apache.Qpid.Client int prefetchLow, int prefetchHigh, bool noLocal, - bool exclusive) + bool exclusive, + bool browse) { lock (_closingLock) { @@ -720,7 +758,8 @@ namespace Apache.Qpid.Client BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, _messageFactoryRegistry, this, - prefetchHigh, prefetchLow, exclusive); + prefetchHigh, prefetchLow, exclusive, + browse); try { RegisterConsumer(consumer); @@ -894,7 +933,7 @@ namespace Apache.Qpid.Client _consumers.Add(tag, consumer); String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode, tag); + consumer.Exclusive, consumer.AcknowledgeMode, tag, consumer.Browse); } @@ -919,13 +958,17 @@ namespace Apache.Qpid.Client routingKey, true, args)); } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag, bool browse) { - + FieldTable args = new FieldTable(); + if(browse) + { + args["x-filter-no-consume"] = true; + } AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, acknowledgeMode == AcknowledgeMode.NoAcknowledge, - exclusive, true, new FieldTable()); + exclusive, true, args); _replayFrames.Add(basicConsume); @@ -958,13 +1001,13 @@ namespace Apache.Qpid.Client } } - private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) { _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, false, null); + isAutoDelete, false, (FieldTable) args); lock (_connection.FailoverMutex) diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 6fee316cb4..fdac5e75f2 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -44,6 +44,13 @@ namespace Apache.Qpid.Client get { return _exclusive; } } + private bool _browse; + + public bool Browse + { + get { return _browse; } + } + public bool NoLocal { get { return _noLocal; } @@ -131,7 +138,7 @@ namespace Apache.Qpid.Client internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, MessageFactoryRegistry messageFactory, AmqChannel channel, - int prefetchHigh, int prefetchLow, bool exclusive) + int prefetchHigh, int prefetchLow, bool exclusive, bool browse) { _channelId = channelId; _queueName = queueName; @@ -142,6 +149,7 @@ namespace Apache.Qpid.Client _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; + _browse = browse; if (_acknowledgeMode == AcknowledgeMode.SessionTransacted) { |
