summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client
diff options
context:
space:
mode:
authorTomas Restrepo <tomasr@apache.org>2007-05-17 23:50:50 +0000
committerTomas Restrepo <tomasr@apache.org>2007-05-17 23:50:50 +0000
commitbc745c6e2e2dbc1e14e2cd8582be9d6995c8fb7f (patch)
tree2e7e3a0a4213de2daf75870ce0579c73def35c89 /dotnet/Qpid.Client
parentb83eedda6337439bde05afef0790f6a70929db00 (diff)
downloadqpid-python-bc745c6e2e2dbc1e14e2cd8582be9d6995c8fb7f.tar.gz
* QPID-492 Fix Race condition in message decoding
* QPID-249 Make ServiceRequestingClient and ServiceProvidingClient a single, self contained test * Fix incorrect exception message in Qpid.Buffers, improve tests * Make ContentBody use an sliced buffer to avoid extra data copy * Remove useless tests in Qpid.Client (Blocking IO tests) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@539178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs115
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageProducer.cs578
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs4
-rw-r--r--dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidHeaders.cs24
-rw-r--r--dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs61
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs4
-rw-r--r--dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs59
-rw-r--r--dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs31
-rw-r--r--dotnet/Qpid.Client/Qpid.Client.csproj1
11 files changed, 470 insertions, 411 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 0d0c48303a..56202e0094 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -36,7 +36,7 @@ namespace Qpid.Client
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
- private const int BASIC_CONTENT_TYPE = 60;
+ internal const int BASIC_CONTENT_TYPE = 60;
private static int _nextSessionNumber = 0;
@@ -763,119 +763,6 @@ namespace Qpid.Client
return new MessagePublisherBuilder(this);
}
- internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate,
- AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
- bool disableTimestamps)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
-
- private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
- {
- AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName,
- routingKey, mandatory, immediate);
-
- long currentTime = 0;
- if (!disableTimestamps)
- {
- currentTime = DateTime.UtcNow.Ticks;
- message.Timestamp = currentTime;
- }
-
- ByteBuffer buf = message.Data;
- byte[] payload = null;
- if (buf != null)
- {
- payload = new byte[buf.Remaining];
- buf.GetBytes(payload);
- }
- BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
-
- if (timeToLive > 0)
- {
- if (!disableTimestamps)
- {
- contentHeaderProperties.Expiration = currentTime + timeToLive;
- }
- }
- else
- {
- contentHeaderProperties.Expiration = 0;
- }
- contentHeaderProperties.SetDeliveryMode(deliveryMode);
- contentHeaderProperties.Priority = (byte)priority;
-
- ContentBody[] contentBodies = CreateContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
- for (int i = 0; i < contentBodies.Length; i++)
- {
- frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
- }
- if (contentBodies.Length > 0 && _logger.IsDebugEnabled)
- {
- _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties,
- (uint)payload.Length);
- if (_logger.IsDebugEnabled)
- {
- _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- lock (_connection.FailoverMutex) {
- _connection.ProtocolWriter.Write(compositeFrame);
- }
- }
-
- /// <summary>
- /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- /// maximum frame size.
- /// </summary>
- /// <param name="payload"></param>
- /// <returns>return the array of content bodies</returns>
- private ContentBody[] CreateContentBodies(byte[] payload)
- {
- if (payload == null)
- {
- return null;
- }
- else if (payload.Length == 0)
- {
- return new ContentBody[0];
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- long framePayloadMax = Connection.MaximumFrameSize - 1;
- int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame;
- ContentBody[] bodies = new ContentBody[frameCount];
-
- if (frameCount == 1)
- {
- bodies[0] = new ContentBody();
- bodies[0].Payload = payload;
- }
- else
- {
- long remaining = payload.Length;
- for (int i = 0; i < bodies.Length; i++)
- {
- bodies[i] = new ContentBody();
- byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining];
- Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length);
- bodies[i].Payload = framePayload;
- remaining -= framePayload.Length;
- }
- }
- return bodies;
- }
-
public string GenerateUniqueName()
{
string result = _connection.ProtocolSession.GenerateQueueName();
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
index 759ffd62e3..fd430694df 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -21,255 +21,359 @@
using System;
using System.Threading;
using log4net;
+using Qpid.Buffer;
using Qpid.Client.Message;
using Qpid.Messaging;
+using Qpid.Framing;
namespace Qpid.Client
{
- public class BasicMessageProducer : Closeable, IMessagePublisher
- {
- protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
- /// <summary>
- /// If true, messages will not get a timestamp.
- /// </summary>
- private bool _disableTimestamps;
-
- /// <summary>
- /// Priority of messages created by this producer.
- /// </summary>
- private int _messagePriority;
-
- /// <summary>
- /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- ///
- private long _timeToLive;
-
- /// <summary>
- /// Delivery mode used for this producer.
- /// </summary>
- private DeliveryMode _deliveryMode;
-
- private bool _immediate;
- private bool _mandatory;
-
- string _exchangeName;
- string _routingKey;
-
- /// <summary>
- /// Default encoding used for messages produced by this producer.
- /// </summary>
- private string _encoding;
-
- /// <summary>
- /// Default encoding used for message produced by this producer.
- /// </summary>
- private string _mimeType;
-
- /// <summary>
- /// True if this producer was created from a transacted session
- /// </summary>
- private bool _transacted;
-
- private ushort _channelId;
-
- /// <summary>
- /// This is an id generated by the session and is used to tie individual producers to the session. This means we
- /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
- /// to the session so that when an error is propagated to the session it can close the producer (meaning that
- /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
- /// </summary>
- private long _producerId;
-
- /// <summary>
- /// The session used to create this producer
- /// </summary>
- private AmqChannel _channel;
-
- /// <summary>
- /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
- /// </summary>
- protected const bool DEFAULT_IMMEDIATE = false;
-
- /// <summary>
- /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
- /// connected to the exchange for the message
- /// </summary>
- protected const bool DEFAULT_MANDATORY = true;
-
- public BasicMessageProducer(string exchangeName, string routingKey,
- bool transacted,
- ushort channelId,
- AmqChannel channel,
- long producerId,
- DeliveryMode deliveryMode,
- long timeToLive,
- bool immediate,
- bool mandatory,
- int priority)
- {
- _exchangeName = exchangeName;
- _routingKey = routingKey;
- _transacted = transacted;
- _channelId = channelId;
- _channel = channel;
- _producerId = producerId;
- _deliveryMode = deliveryMode;
- _timeToLive = timeToLive;
- _immediate = immediate;
- _mandatory = mandatory;
- _messagePriority = priority;
-
- _channel.RegisterProducer(producerId, this);
- }
-
-
- #region IMessagePublisher Members
-
- public DeliveryMode DeliveryMode
- {
- get
- {
- CheckNotClosed();
- return _deliveryMode;
- }
- set
- {
- CheckNotClosed();
- _deliveryMode = value;
- }
- }
-
- public string ExchangeName
- {
- get { return _exchangeName; }
- }
-
- public string RoutingKey
- {
- get { return _routingKey; }
- }
-
- public bool DisableMessageID
- {
- get
- {
- throw new Exception("The method or operation is not implemented.");
- }
- set
- {
- throw new Exception("The method or operation is not implemented.");
- }
- }
-
- public bool DisableMessageTimestamp
- {
- get
- {
- CheckNotClosed();
- return _disableTimestamps;
- }
- set
- {
- CheckNotClosed();
- _disableTimestamps = value;
- }
- }
-
- public int Priority
- {
- get
- {
- CheckNotClosed();
- return _messagePriority;
- }
- set
- {
- CheckNotClosed();
- if (value < 0 || value > 9)
- {
- throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = value;
- }
- }
-
- public override void Close()
- {
- _logger.Debug("Closing producer " + this);
- Interlocked.Exchange(ref _closed, CLOSED);
- _channel.DeregisterProducer(_producerId);
- }
-
- public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
- {
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ ///
+ private long _timeToLive;
+
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+
+ private bool _immediate;
+ private bool _mandatory;
+
+ string _exchangeName;
+ string _routingKey;
+
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+
+ private ushort _channelId;
+
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+
+ /// <summary>
+ /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
+ /// </summary>
+ protected const bool DEFAULT_IMMEDIATE = false;
+
+ /// <summary>
+ /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
+ /// connected to the exchange for the message
+ /// </summary>
+ protected const bool DEFAULT_MANDATORY = true;
+
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+
+ _channel.RegisterProducer(producerId, this);
+ }
+
+
+ #region IMessagePublisher Members
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
- DEFAULT_IMMEDIATE);
- }
-
- public void Send(IMessage msg)
- {
+ return _deliveryMode;
+ }
+ set
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
- DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
- }
-
- // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
- // to facilitate publishing messages to potentially non-existent recipients.
- public void Send(IMessage msg, bool mandatory)
- {
+ _deliveryMode = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
- mandatory, DEFAULT_IMMEDIATE);
- }
-
- public long TimeToLive
- {
- get
- {
- CheckNotClosed();
- return _timeToLive;
- }
- set
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 || value > 9 )
{
- CheckNotClosed();
- if (value < 0)
- {
- throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
- }
- _timeToLive = value;
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
}
- }
-
- #endregion
-
- private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
- {
- _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps);
- }
-
- public string MimeType
- {
- set
+ _messagePriority = value;
+ }
+ }
+
+ public override void Close()
+ {
+ _logger.Debug("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
+ DEFAULT_IMMEDIATE);
+ }
+
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ }
+
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ mandatory, DEFAULT_IMMEDIATE);
+ }
+
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 )
{
- CheckNotClosed();
- _mimeType = value;
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
}
- }
+ _timeToLive = value;
+ }
+ }
- public string Encoding
- {
- set
- {
- CheckNotClosed();
- _encoding = value;
- }
- }
+ #endregion
- public void Dispose()
- {
- Close();
- }
- }
+ public string MimeType
+ {
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+
+ public string Encoding
+ {
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #region Message Publishing
+
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ // todo: handle session access ticket
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+ _channel.ChannelId, 0, exchangeName,
+ routingKey, mandatory, immediate
+ );
+
+ // fix message properties
+ if ( !_disableTimestamps )
+ {
+ message.Timestamp = DateTime.UtcNow.Ticks;
+ message.Expiration = message.Timestamp + timeToLive;
+ } else
+ {
+ message.Expiration = 0;
+ }
+ message.DeliveryMode = deliveryMode;
+ message.Priority = (byte)priority;
+
+ ByteBuffer payload = message.Data;
+ int payloadLength = payload.Limit;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for ( int i = 0; i < contentBodies.Length; i++ )
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+ _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
+ message.ContentHeaderProperties, (uint)payloadLength
+ );
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ lock ( _channel.Connection.FailoverMutex )
+ {
+ _channel.Connection.ProtocolWriter.Write(compositeFrame);
+ }
+ }
+
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(ByteBuffer payload)
+ {
+ if ( payload == null )
+ {
+ return null;
+ } else if ( payload.Remaining == 0 )
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+ int frameCount = CalculateContentBodyFrames(payload);
+ ContentBody[] bodies = new ContentBody[frameCount];
+ for ( int i = 0; i < frameCount; i++ )
+ {
+ int length = (payload.Remaining >= framePayloadMax)
+ ? framePayloadMax : payload.Remaining;
+ bodies[i] = new ContentBody(payload, (uint)length);
+ }
+ return bodies;
+ }
+
+ private int CalculateContentBodyFrames(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account
+ // for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ( (payload == null) || (payload.Remaining == 0) )
+ {
+ frameCount = 0;
+ } else
+ {
+ int dataLength = payload.Remaining;
+ int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+ #endregion // Message Publishing
+ }
}
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
index 41e9d2240c..afcbd26781 100644
--- a/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
@@ -45,14 +45,12 @@ namespace Qpid.Client.Handler
_logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
parameters.FrameMax = frame.FrameMax;
- parameters.FrameMax = 65535;
- //params.setChannelMax(frame.channelMax);
parameters.Heartbeat = frame.Heartbeat;
session.ConnectionTuneParameters = parameters;
stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED);
session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame(
- evt.ChannelId, frame.ChannelMax, 65535, frame.Heartbeat));
+ evt.ChannelId, frame.ChannelMax, frame.FrameMax, frame.Heartbeat));
session.WriteFrame(ConnectionOpenBody.CreateAMQFrame(
evt.ChannelId, session.AMQConnection.VirtualHost, null, true));
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
index 61b308696c..a7ee085a04 100644
--- a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
+++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -43,7 +43,7 @@ namespace Qpid.Client.Message
if (bodies != null && bodies.Count == 1)
{
_logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")");
- data = ByteBuffer.Wrap(((ContentBody)bodies[0]).Payload);
+ data = ((ContentBody)bodies[0]).Payload;
}
else
{
diff --git a/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs b/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
index 6538fcbefc..a258c82d15 100644
--- a/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
+++ b/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
@@ -28,16 +28,10 @@ namespace Qpid.Client.Message
_headers.Clear();
}
- public string this[string name]
+ public object this[string name]
{
- get
- {
- return GetString(name);
- }
- set
- {
- SetString(name, value);
- }
+ get { return GetObject(name); }
+ set { SetObject(name, value); }
}
public bool GetBoolean(string name)
@@ -167,6 +161,18 @@ namespace Qpid.Client.Message
_headers.SetString(propertyName, value);
}
+ public object GetObject(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ return _headers[propertyName];
+ }
+
+ public void SetObject(string propertyName, object value)
+ {
+ CheckPropertyName(propertyName);
+ _headers[propertyName] = value;
+ }
+
private static void CheckPropertyName(string propertyName)
{
if ( propertyName == null )
diff --git a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
index cb4e64718b..b64c8e1c27 100644
--- a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
@@ -43,7 +43,7 @@ namespace Qpid.Client.Message
Bodies.Add(body);
if (body.Payload != null)
{
- _bytesReceived += (uint)body.Payload.Length;
+ _bytesReceived += (uint)body.Payload.Remaining;
}
}
diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
index ca7ffad8b3..4e4ca03322 100644
--- a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
+++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
@@ -33,35 +33,39 @@ namespace Qpid.Client.Transport
// Warning: don't use this log for regular logging.
static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
- IByteChannel byteChannel;
- IProtocolEncoder encoder;
- IProtocolDecoder decoder;
+ IByteChannel _byteChannel;
+ IProtocolEncoder _encoder;
+ IProtocolDecoder _decoder;
+ IProtocolDecoderOutput _decoderOutput;
+ private object _syncLock;
- public AmqpChannel(IByteChannel byteChannel)
+ public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput)
{
- this.byteChannel = byteChannel;
+ _byteChannel = byteChannel;
+ _decoderOutput = decoderOutput;
+ _syncLock = new object();
AMQProtocolProvider protocolProvider = new AMQProtocolProvider();
IProtocolCodecFactory factory = protocolProvider.CodecFactory;
- encoder = factory.Encoder;
- decoder = factory.Decoder;
+ _encoder = factory.Encoder;
+ _decoder = factory.Decoder;
}
- public Queue Read()
+ public void Read()
{
- ByteBuffer buffer = byteChannel.Read();
- return DecodeAndTrace(buffer);
+ ByteBuffer buffer = _byteChannel.Read();
+ Decode(buffer);
}
public IAsyncResult BeginRead(AsyncCallback callback, object state)
{
- return byteChannel.BeginRead(callback, state);
+ return _byteChannel.BeginRead(callback, state);
}
- public Queue EndRead(IAsyncResult result)
+ public void EndRead(IAsyncResult result)
{
- ByteBuffer buffer = byteChannel.EndRead(result);
- return DecodeAndTrace(buffer);
+ ByteBuffer buffer = _byteChannel.EndRead(result);
+ Decode(buffer);
}
public void Write(IDataBlock o)
@@ -74,43 +78,32 @@ namespace Qpid.Client.Transport
// we should be doing an async write, but apparently
// the mentalis library doesn't queue async read/writes
// correctly and throws random IOException's. Stay sync for a while
- //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
- byteChannel.Write(Encode(o));
+ //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
+ _byteChannel.Write(Encode(o));
}
private void OnAsyncWriteDone(IAsyncResult result)
{
- byteChannel.EndWrite(result);
+ _byteChannel.EndWrite(result);
}
- private Queue DecodeAndTrace(ByteBuffer buffer)
+ private void Decode(ByteBuffer buffer)
{
- Queue frames = Decode(buffer);
-
- // TODO: Refactor to decorator.
- if ( _protocolTraceLog.IsDebugEnabled )
+ // make sure we don't try to decode more than
+ // one buffer at the same time
+ lock ( _syncLock )
{
- foreach ( object o in frames )
- {
- _protocolTraceLog.Debug(String.Format("READ {0}", o));
- }
+ _decoder.Decode(buffer, _decoderOutput);
}
- return frames;
}
private ByteBuffer Encode(object o)
{
SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
- encoder.Encode(o, output);
+ _encoder.Encode(o, output);
return output.buffer;
}
- private Queue Decode(ByteBuffer byteBuffer)
- {
- SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput();
- decoder.Decode(byteBuffer, outx);
- return outx.MessageQueue;
- }
}
}
diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
index 0379e582d6..e4d4d2ed29 100644
--- a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
+++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
@@ -25,8 +25,8 @@ namespace Qpid.Client.Transport
{
public interface IProtocolChannel : IProtocolWriter
{
- Queue Read();
+ void Read();
IAsyncResult BeginRead(AsyncCallback callback, object state);
- Queue EndRead(IAsyncResult result);
+ void EndRead(IAsyncResult result);
}
}
diff --git a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
new file mode 100644
index 0000000000..07df62ea84
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Qpid.Client.Protocol;
+using Qpid.Codec;
+using Qpid.Framing;
+using log4net;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
+ /// each <see cref="IDataBlock"/> as it is decoded to the
+ /// protocol listener
+ /// </summary>
+ internal class ProtocolDecoderOutput : IProtocolDecoderOutput
+ {
+ private IProtocolListener _protocolListener;
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+
+ public ProtocolDecoderOutput(IProtocolListener protocolListener)
+ {
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ _protocolListener = protocolListener;
+ }
+
+ public void Write(object message)
+ {
+ IDataBlock block = message as IDataBlock;
+ if ( block != null )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", block));
+ _protocolListener.OnMessage(block);
+ }
+ }
+ }
+} // namespace Qpid.Client.Transport
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
index 1fb07fb245..2895c75431 100644
--- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
+++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
@@ -24,6 +24,7 @@ using System.IO;
using System.Threading;
using Qpid.Client.Qms;
using Qpid.Client.Protocol;
+using Qpid.Codec;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
@@ -66,7 +67,11 @@ namespace Qpid.Client.Transport.Socket.Blocking
_ioHandler = MakeBrokerConnection(broker, connection);
// todo: get default read size from config!
- _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
// post an initial async read
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
}
@@ -117,22 +122,28 @@ namespace Qpid.Client.Transport.Socket.Blocking
{
try
{
- Queue frames = _amqpChannel.EndRead(result);
-
- // process results
- foreach ( IDataBlock dataBlock in frames )
- {
- _protocolListener.OnMessage(dataBlock);
- }
- // if we're not stopping, post a read again
+ _amqpChannel.EndRead(result);
+
bool stopping = _stopEvent.WaitOne(0, false);
if ( !stopping )
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
} catch ( Exception e )
{
- _protocolListener.OnException(e);
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
}
}
+
+ #region IProtocolDecoderOutput Members
+
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+
+ #endregion
}
}
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj
index 2bafeb23f6..de585be7e0 100644
--- a/dotnet/Qpid.Client/Qpid.Client.csproj
+++ b/dotnet/Qpid.Client/Qpid.Client.csproj
@@ -110,6 +110,7 @@
<Compile Include="Client\Transport\IProtocolChannel.cs" />
<Compile Include="Client\Transport\IProtocolWriter.cs" />
<Compile Include="Client\Transport\ITransport.cs" />
+ <Compile Include="Client\Transport\ProtocolDecoderOutput.cs" />
<Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
<Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />