summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/AmqChannel.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs286
1 files changed, 93 insertions, 193 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 3471ac3640..9a8b9f787a 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -36,7 +36,7 @@ namespace Qpid.Client
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
- private const int BASIC_CONTENT_TYPE = 60;
+ internal const int BASIC_CONTENT_TYPE = 60;
private static int _nextSessionNumber = 0;
@@ -122,7 +122,7 @@ namespace Qpid.Client
if (consumer == null)
{
- _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
}
else
{
@@ -156,103 +156,72 @@ namespace Qpid.Client
}
}
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) :
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch)
- {
- }
-
/// <summary>
/// Initializes a new instance of the <see cref="AmqChannel"/> class.
/// </summary>
- /// <param name="con">The con.</param>
+ /// <param name="con">The connection.</param>
/// <param name="channelId">The channel id.</param>
/// <param name="transacted">if set to <c>true</c> [transacted].</param>
/// <param name="acknowledgeMode">The acknowledge mode.</param>
- /// <param name="messageFactoryRegistry">The message factory registry.</param>
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
+ /// <param name="defaultPrefetch">Default prefetch value</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch)
+ : this()
+ {
+ _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+ _connection = con;
+ _transacted = transacted;
+ if ( transacted )
+ {
+ _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+ } else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
+ _channelId = channelId;
+ }
+
+ private AmqChannel()
{
- _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
- _connection = con;
- _transacted = transacted;
- if (transacted)
- {
- _acknowledgeMode = AcknowledgeMode.SessionTransacted;
- }
- else
- {
- _acknowledgeMode = acknowledgeMode;
- }
- _channelId = channelId;
- _messageFactoryRegistry = messageFactoryRegistry;
+ _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
+ }
+
+ /// <summary>
+ /// Create a disconnected channel that will fault
+ /// for most things, but is useful for testing
+ /// </summary>
+ /// <returns>A new disconnected channel</returns>
+ public static IChannel CreateDisconnectedChannel()
+ {
+ return new AmqChannel();
}
+
public IBytesMessage CreateBytesMessage()
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
- try
- {
- return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
}
public IMessage CreateMessage()
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
- try
- {
- // TODO: this is supposed to create a message consisting only of message headers
- return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ // TODO: this is supposed to create a message consisting only of message headers
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+
+ public IMessage CreateMessage(string mimeType)
+ {
+ return _messageFactoryRegistry.CreateMessage(mimeType);
}
public ITextMessage CreateTextMessage()
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
-
- try
- {
- return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ return CreateTextMessage(String.Empty);
}
public ITextMessage CreateTextMessage(string text)
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
- try
- {
- ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
- msg.Text = text;
- return msg;
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ msg.Text = text;
+ return msg;
}
public bool Transacted
@@ -538,11 +507,6 @@ namespace Qpid.Client
}
}
- public IFieldTable CreateFieldTable()
- {
- return new FieldTable();
- }
-
public void Unsubscribe(String name)
{
throw new NotImplementedException(); // FIXME
@@ -709,6 +673,30 @@ namespace Qpid.Client
// at this point the _consumers map will be empty
}
+ public void PurgeQueue(string queueName, bool noWait)
+ {
+ DoPurgeQueue(queueName, noWait);
+ }
+
+ private void DoPurgeQueue(string queueName, bool noWait)
+ {
+ try
+ {
+ _logger.DebugFormat("PurgeQueue {0}", queueName);
+
+ AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
+
+ if (noWait)
+ _connection.ProtocolWriter.Write(purgeQueue);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
+ }
+ catch (AMQException)
+ {
+ throw;
+ }
+ }
+
/**
* Replays frame on fail over.
*
@@ -784,132 +772,44 @@ namespace Qpid.Client
throw new NotImplementedException(); // FIXME
}
- public void DeleteQueue()
+ public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
- throw new NotImplementedException(); // FIXME
+ DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
}
- public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
- return new MessageConsumerBuilder(this, queueName);
- }
-
- public MessagePublisherBuilder CreatePublisherBuilder()
- {
- return new MessagePublisherBuilder(this);
- }
-
- internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate,
- AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
- bool disableTimestamps)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
-
- private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
- {
- AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName,
- routingKey, mandatory, immediate);
-
- long currentTime = 0;
- if (!disableTimestamps)
+ try
{
- currentTime = DateTime.UtcNow.Ticks;
- message.Timestamp = currentTime;
- }
+ _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
+
+ AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
+ queueName, // queueName
+ ifUnused, // IfUnUsed
+ ifEmpty, // IfEmpty
+ noWait); // NoWait
- ByteBuffer buf = message.Data;
- byte[] payload = null;
- if (buf != null)
- {
- payload = new byte[buf.Remaining];
- buf.GetBytes(payload);
- }
- BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
+ _replayFrames.Add(queueDelete);
- if (timeToLive > 0)
- {
- if (!disableTimestamps)
- {
- contentHeaderProperties.Expiration = currentTime + timeToLive;
- }
- }
- else
- {
- contentHeaderProperties.Expiration = 0;
- }
- contentHeaderProperties.SetDeliveryMode(deliveryMode);
- contentHeaderProperties.Priority = (byte)priority;
-
- ContentBody[] contentBodies = CreateContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
- for (int i = 0; i < contentBodies.Length; i++)
- {
- frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
- }
- if (contentBodies.Length > 0 && _logger.IsDebugEnabled)
- {
- _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ if (noWait)
+ _connection.ProtocolWriter.Write(queueDelete);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
}
-
- // weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties,
- (uint)payload.Length);
- if (_logger.IsDebugEnabled)
+ catch (AMQException)
{
- _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ throw;
}
+ }
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- lock (_connection.FailoverMutex) {
- _connection.ProtocolWriter.Write(compositeFrame);
- }
+ public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ {
+ return new MessageConsumerBuilder(this, queueName);
}
- /// <summary>
- /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- /// maximum frame size.
- /// </summary>
- /// <param name="payload"></param>
- /// <returns>return the array of content bodies</returns>
- private ContentBody[] CreateContentBodies(byte[] payload)
+ public MessagePublisherBuilder CreatePublisherBuilder()
{
- if (payload == null)
- {
- return null;
- }
- else if (payload.Length == 0)
- {
- return new ContentBody[0];
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- long framePayloadMax = Connection.MaximumFrameSize - 1;
- int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame;
- ContentBody[] bodies = new ContentBody[frameCount];
-
- if (frameCount == 1)
- {
- bodies[0] = new ContentBody();
- bodies[0].Payload = payload;
- }
- else
- {
- long remaining = payload.Length;
- for (int i = 0; i < bodies.Length; i++)
- {
- bodies[i] = new ContentBody();
- byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining];
- Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length);
- bodies[i].Payload = framePayload;
- remaining -= framePayload.Length;
- }
- }
- return bodies;
+ return new MessagePublisherBuilder(this);
}
public string GenerateUniqueName()