diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 286 |
1 files changed, 93 insertions, 193 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 3471ac3640..9a8b9f787a 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -36,7 +36,7 @@ namespace Qpid.Client { private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); - private const int BASIC_CONTENT_TYPE = 60; + internal const int BASIC_CONTENT_TYPE = 60; private static int _nextSessionNumber = 0; @@ -122,7 +122,7 @@ namespace Qpid.Client if (consumer == null) { - _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring..."); + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); } else { @@ -156,103 +156,72 @@ namespace Qpid.Client } } - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) : - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch) - { - } - /// <summary> /// Initializes a new instance of the <see cref="AmqChannel"/> class. /// </summary> - /// <param name="con">The con.</param> + /// <param name="con">The connection.</param> /// <param name="channelId">The channel id.</param> /// <param name="transacted">if set to <c>true</c> [transacted].</param> /// <param name="acknowledgeMode">The acknowledge mode.</param> - /// <param name="messageFactoryRegistry">The message factory registry.</param> - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) + /// <param name="defaultPrefetch">Default prefetch value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) + : this() + { + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; + if ( transacted ) + { + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } else + { + _acknowledgeMode = acknowledgeMode; + } + _channelId = channelId; + } + + private AmqChannel() { - _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); - _connection = con; - _transacted = transacted; - if (transacted) - { - _acknowledgeMode = AcknowledgeMode.SessionTransacted; - } - else - { - _acknowledgeMode = acknowledgeMode; - } - _channelId = channelId; - _messageFactoryRegistry = messageFactoryRegistry; + _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + } + + /// <summary> + /// Create a disconnected channel that will fault + /// for most things, but is useful for testing + /// </summary> + /// <returns>A new disconnected channel</returns> + public static IChannel CreateDisconnectedChannel() + { + return new AmqChannel(); } + public IBytesMessage CreateBytesMessage() { - lock (_connection.FailoverMutex) - { - CheckNotClosed(); - try - { - return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); - } - catch (AMQException e) - { - throw new QpidException("Unable to create message: " + e); - } - } + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); } public IMessage CreateMessage() { - lock (_connection.FailoverMutex) - { - CheckNotClosed(); - try - { - // TODO: this is supposed to create a message consisting only of message headers - return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); - } - catch (AMQException e) - { - throw new QpidException("Unable to create message: " + e); - } - } + // TODO: this is supposed to create a message consisting only of message headers + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + public IMessage CreateMessage(string mimeType) + { + return _messageFactoryRegistry.CreateMessage(mimeType); } public ITextMessage CreateTextMessage() { - lock (_connection.FailoverMutex) - { - CheckNotClosed(); - - try - { - return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); - } - catch (AMQException e) - { - throw new QpidException("Unable to create message: " + e); - } - } + return CreateTextMessage(String.Empty); } public ITextMessage CreateTextMessage(string text) { - lock (_connection.FailoverMutex) - { - CheckNotClosed(); - try - { - ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); - msg.Text = text; - return msg; - } - catch (AMQException e) - { - throw new QpidException("Unable to create message: " + e); - } - } + ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); + msg.Text = text; + return msg; } public bool Transacted @@ -538,11 +507,6 @@ namespace Qpid.Client } } - public IFieldTable CreateFieldTable() - { - return new FieldTable(); - } - public void Unsubscribe(String name) { throw new NotImplementedException(); // FIXME @@ -709,6 +673,30 @@ namespace Qpid.Client // at this point the _consumers map will be empty } + public void PurgeQueue(string queueName, bool noWait) + { + DoPurgeQueue(queueName, noWait); + } + + private void DoPurgeQueue(string queueName, bool noWait) + { + try + { + _logger.DebugFormat("PurgeQueue {0}", queueName); + + AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); + + if (noWait) + _connection.ProtocolWriter.Write(purgeQueue); + else + _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); + } + catch (AMQException) + { + throw; + } + } + /** * Replays frame on fail over. * @@ -784,132 +772,44 @@ namespace Qpid.Client throw new NotImplementedException(); // FIXME } - public void DeleteQueue() + public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { - throw new NotImplementedException(); // FIXME + DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); } - public MessageConsumerBuilder CreateConsumerBuilder(string queueName) + private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { - return new MessageConsumerBuilder(this, queueName); - } - - public MessagePublisherBuilder CreatePublisherBuilder() - { - return new MessagePublisherBuilder(this); - } - - internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, - AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, - bool disableTimestamps) - { - DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); - } - - private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps) - { - AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName, - routingKey, mandatory, immediate); - - long currentTime = 0; - if (!disableTimestamps) + try { - currentTime = DateTime.UtcNow.Ticks; - message.Timestamp = currentTime; - } + _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); + + AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, + queueName, // queueName + ifUnused, // IfUnUsed + ifEmpty, // IfEmpty + noWait); // NoWait - ByteBuffer buf = message.Data; - byte[] payload = null; - if (buf != null) - { - payload = new byte[buf.Remaining]; - buf.GetBytes(payload); - } - BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties; + _replayFrames.Add(queueDelete); - if (timeToLive > 0) - { - if (!disableTimestamps) - { - contentHeaderProperties.Expiration = currentTime + timeToLive; - } - } - else - { - contentHeaderProperties.Expiration = 0; - } - contentHeaderProperties.SetDeliveryMode(deliveryMode); - contentHeaderProperties.Priority = (byte)priority; - - ContentBody[] contentBodies = CreateContentBodies(payload); - AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; - for (int i = 0; i < contentBodies.Length; i++) - { - frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); - } - if (contentBodies.Length > 0 && _logger.IsDebugEnabled) - { - _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + if (noWait) + _connection.ProtocolWriter.Write(queueDelete); + else + _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); } - - // weight argument of zero indicates no child content headers, just bodies - AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties, - (uint)payload.Length); - if (_logger.IsDebugEnabled) + catch (AMQException) { - _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + throw; } + } - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - - lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(compositeFrame); - } + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) + { + return new MessageConsumerBuilder(this, queueName); } - /// <summary> - /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated - /// maximum frame size. - /// </summary> - /// <param name="payload"></param> - /// <returns>return the array of content bodies</returns> - private ContentBody[] CreateContentBodies(byte[] payload) + public MessagePublisherBuilder CreatePublisherBuilder() { - if (payload == null) - { - return null; - } - else if (payload.Length == 0) - { - return new ContentBody[0]; - } - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - long framePayloadMax = Connection.MaximumFrameSize - 1; - int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame; - ContentBody[] bodies = new ContentBody[frameCount]; - - if (frameCount == 1) - { - bodies[0] = new ContentBody(); - bodies[0].Payload = payload; - } - else - { - long remaining = payload.Length; - for (int i = 0; i < bodies.Length; i++) - { - bodies[i] = new ContentBody(); - byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining]; - Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length); - bodies[i].Payload = framePayload; - remaining -= framePayload.Length; - } - } - return bodies; + return new MessagePublisherBuilder(this); } public string GenerateUniqueName() |
