diff options
| author | Tomas Restrepo <tomasr@apache.org> | 2007-05-17 23:50:50 +0000 |
|---|---|---|
| committer | Tomas Restrepo <tomasr@apache.org> | 2007-05-17 23:50:50 +0000 |
| commit | bc745c6e2e2dbc1e14e2cd8582be9d6995c8fb7f (patch) | |
| tree | 2e7e3a0a4213de2daf75870ce0579c73def35c89 /dotnet/Qpid.Client | |
| parent | b83eedda6337439bde05afef0790f6a70929db00 (diff) | |
| download | qpid-python-bc745c6e2e2dbc1e14e2cd8582be9d6995c8fb7f.tar.gz | |
* QPID-492 Fix Race condition in message decoding
* QPID-249 Make ServiceRequestingClient and ServiceProvidingClient a single, self contained test
* Fix incorrect exception message in Qpid.Buffers, improve tests
* Make ContentBody use an sliced buffer to avoid extra data copy
* Remove useless tests in Qpid.Client (Blocking IO tests)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@539178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client')
11 files changed, 470 insertions, 411 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 0d0c48303a..56202e0094 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -36,7 +36,7 @@ namespace Qpid.Client { private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); - private const int BASIC_CONTENT_TYPE = 60; + internal const int BASIC_CONTENT_TYPE = 60; private static int _nextSessionNumber = 0; @@ -763,119 +763,6 @@ namespace Qpid.Client return new MessagePublisherBuilder(this); } - internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, - AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, - bool disableTimestamps) - { - DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); - } - - private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps) - { - AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName, - routingKey, mandatory, immediate); - - long currentTime = 0; - if (!disableTimestamps) - { - currentTime = DateTime.UtcNow.Ticks; - message.Timestamp = currentTime; - } - - ByteBuffer buf = message.Data; - byte[] payload = null; - if (buf != null) - { - payload = new byte[buf.Remaining]; - buf.GetBytes(payload); - } - BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties; - - if (timeToLive > 0) - { - if (!disableTimestamps) - { - contentHeaderProperties.Expiration = currentTime + timeToLive; - } - } - else - { - contentHeaderProperties.Expiration = 0; - } - contentHeaderProperties.SetDeliveryMode(deliveryMode); - contentHeaderProperties.Priority = (byte)priority; - - ContentBody[] contentBodies = CreateContentBodies(payload); - AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; - for (int i = 0; i < contentBodies.Length; i++) - { - frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); - } - if (contentBodies.Length > 0 && _logger.IsDebugEnabled) - { - _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); - } - - // weight argument of zero indicates no child content headers, just bodies - AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties, - (uint)payload.Length); - if (_logger.IsDebugEnabled) - { - _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - - lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(compositeFrame); - } - } - - /// <summary> - /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated - /// maximum frame size. - /// </summary> - /// <param name="payload"></param> - /// <returns>return the array of content bodies</returns> - private ContentBody[] CreateContentBodies(byte[] payload) - { - if (payload == null) - { - return null; - } - else if (payload.Length == 0) - { - return new ContentBody[0]; - } - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - long framePayloadMax = Connection.MaximumFrameSize - 1; - int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame; - ContentBody[] bodies = new ContentBody[frameCount]; - - if (frameCount == 1) - { - bodies[0] = new ContentBody(); - bodies[0].Payload = payload; - } - else - { - long remaining = payload.Length; - for (int i = 0; i < bodies.Length; i++) - { - bodies[i] = new ContentBody(); - byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining]; - Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length); - bodies[i].Payload = framePayload; - remaining -= framePayload.Length; - } - } - return bodies; - } - public string GenerateUniqueName() { string result = _connection.ProtocolSession.GenerateQueueName(); diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs index 759ffd62e3..fd430694df 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs @@ -21,255 +21,359 @@ using System; using System.Threading; using log4net; +using Qpid.Buffer; using Qpid.Client.Message; using Qpid.Messaging; +using Qpid.Framing; namespace Qpid.Client { - public class BasicMessageProducer : Closeable, IMessagePublisher - { - protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); - - /// <summary> - /// If true, messages will not get a timestamp. - /// </summary> - private bool _disableTimestamps; - - /// <summary> - /// Priority of messages created by this producer. - /// </summary> - private int _messagePriority; - - /// <summary> - /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. - /// - private long _timeToLive; - - /// <summary> - /// Delivery mode used for this producer. - /// </summary> - private DeliveryMode _deliveryMode; - - private bool _immediate; - private bool _mandatory; - - string _exchangeName; - string _routingKey; - - /// <summary> - /// Default encoding used for messages produced by this producer. - /// </summary> - private string _encoding; - - /// <summary> - /// Default encoding used for message produced by this producer. - /// </summary> - private string _mimeType; - - /// <summary> - /// True if this producer was created from a transacted session - /// </summary> - private bool _transacted; - - private ushort _channelId; - - /// <summary> - /// This is an id generated by the session and is used to tie individual producers to the session. This means we - /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers - /// to the session so that when an error is propagated to the session it can close the producer (meaning that - /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). - /// </summary> - private long _producerId; - - /// <summary> - /// The session used to create this producer - /// </summary> - private AmqChannel _channel; - - /// <summary> - /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue - /// </summary> - protected const bool DEFAULT_IMMEDIATE = false; - - /// <summary> - /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is - /// connected to the exchange for the message - /// </summary> - protected const bool DEFAULT_MANDATORY = true; - - public BasicMessageProducer(string exchangeName, string routingKey, - bool transacted, - ushort channelId, - AmqChannel channel, - long producerId, - DeliveryMode deliveryMode, - long timeToLive, - bool immediate, - bool mandatory, - int priority) - { - _exchangeName = exchangeName; - _routingKey = routingKey; - _transacted = transacted; - _channelId = channelId; - _channel = channel; - _producerId = producerId; - _deliveryMode = deliveryMode; - _timeToLive = timeToLive; - _immediate = immediate; - _mandatory = mandatory; - _messagePriority = priority; - - _channel.RegisterProducer(producerId, this); - } - - - #region IMessagePublisher Members - - public DeliveryMode DeliveryMode - { - get - { - CheckNotClosed(); - return _deliveryMode; - } - set - { - CheckNotClosed(); - _deliveryMode = value; - } - } - - public string ExchangeName - { - get { return _exchangeName; } - } - - public string RoutingKey - { - get { return _routingKey; } - } - - public bool DisableMessageID - { - get - { - throw new Exception("The method or operation is not implemented."); - } - set - { - throw new Exception("The method or operation is not implemented."); - } - } - - public bool DisableMessageTimestamp - { - get - { - CheckNotClosed(); - return _disableTimestamps; - } - set - { - CheckNotClosed(); - _disableTimestamps = value; - } - } - - public int Priority - { - get - { - CheckNotClosed(); - return _messagePriority; - } - set - { - CheckNotClosed(); - if (value < 0 || value > 9) - { - throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); - } - _messagePriority = value; - } - } - - public override void Close() - { - _logger.Debug("Closing producer " + this); - Interlocked.Exchange(ref _closed, CLOSED); - _channel.DeregisterProducer(_producerId); - } - - public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) - { + public class BasicMessageProducer : Closeable, IMessagePublisher + { + protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); + + /// <summary> + /// If true, messages will not get a timestamp. + /// </summary> + private bool _disableTimestamps; + + /// <summary> + /// Priority of messages created by this producer. + /// </summary> + private int _messagePriority; + + /// <summary> + /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + /// + private long _timeToLive; + + /// <summary> + /// Delivery mode used for this producer. + /// </summary> + private DeliveryMode _deliveryMode; + + private bool _immediate; + private bool _mandatory; + + string _exchangeName; + string _routingKey; + + /// <summary> + /// Default encoding used for messages produced by this producer. + /// </summary> + private string _encoding; + + /// <summary> + /// Default encoding used for message produced by this producer. + /// </summary> + private string _mimeType; + + /// <summary> + /// True if this producer was created from a transacted session + /// </summary> + private bool _transacted; + + private ushort _channelId; + + /// <summary> + /// This is an id generated by the session and is used to tie individual producers to the session. This means we + /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers + /// to the session so that when an error is propagated to the session it can close the producer (meaning that + /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + /// </summary> + private long _producerId; + + /// <summary> + /// The session used to create this producer + /// </summary> + private AmqChannel _channel; + + /// <summary> + /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue + /// </summary> + protected const bool DEFAULT_IMMEDIATE = false; + + /// <summary> + /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is + /// connected to the exchange for the message + /// </summary> + protected const bool DEFAULT_MANDATORY = true; + + public BasicMessageProducer(string exchangeName, string routingKey, + bool transacted, + ushort channelId, + AmqChannel channel, + long producerId, + DeliveryMode deliveryMode, + long timeToLive, + bool immediate, + bool mandatory, + int priority) + { + _exchangeName = exchangeName; + _routingKey = routingKey; + _transacted = transacted; + _channelId = channelId; + _channel = channel; + _producerId = producerId; + _deliveryMode = deliveryMode; + _timeToLive = timeToLive; + _immediate = immediate; + _mandatory = mandatory; + _messagePriority = priority; + + _channel.RegisterProducer(producerId, this); + } + + + #region IMessagePublisher Members + + public DeliveryMode DeliveryMode + { + get + { CheckNotClosed(); - SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY, - DEFAULT_IMMEDIATE); - } - - public void Send(IMessage msg) - { + return _deliveryMode; + } + set + { CheckNotClosed(); - SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, - DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); - } - - // This is a short-term hack (knowing that this code will be re-vamped sometime soon) - // to facilitate publishing messages to potentially non-existent recipients. - public void Send(IMessage msg, bool mandatory) - { + _deliveryMode = value; + } + } + + public string ExchangeName + { + get { return _exchangeName; } + } + + public string RoutingKey + { + get { return _routingKey; } + } + + public bool DisableMessageID + { + get + { + throw new Exception("The method or operation is not implemented."); + } + set + { + throw new Exception("The method or operation is not implemented."); + } + } + + public bool DisableMessageTimestamp + { + get + { CheckNotClosed(); - SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, - mandatory, DEFAULT_IMMEDIATE); - } - - public long TimeToLive - { - get - { - CheckNotClosed(); - return _timeToLive; - } - set + return _disableTimestamps; + } + set + { + CheckNotClosed(); + _disableTimestamps = value; + } + } + + public int Priority + { + get + { + CheckNotClosed(); + return _messagePriority; + } + set + { + CheckNotClosed(); + if ( value < 0 || value > 9 ) { - CheckNotClosed(); - if (value < 0) - { - throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); - } - _timeToLive = value; + throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); } - } - - #endregion - - private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) - { - _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps); - } - - public string MimeType - { - set + _messagePriority = value; + } + } + + public override void Close() + { + _logger.Debug("Closing producer " + this); + Interlocked.Exchange(ref _closed, CLOSED); + _channel.DeregisterProducer(_producerId); + } + + public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY, + DEFAULT_IMMEDIATE); + } + + public void Send(IMessage msg) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, + DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + } + + // This is a short-term hack (knowing that this code will be re-vamped sometime soon) + // to facilitate publishing messages to potentially non-existent recipients. + public void Send(IMessage msg, bool mandatory) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, + mandatory, DEFAULT_IMMEDIATE); + } + + public long TimeToLive + { + get + { + CheckNotClosed(); + return _timeToLive; + } + set + { + CheckNotClosed(); + if ( value < 0 ) { - CheckNotClosed(); - _mimeType = value; + throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); } - } + _timeToLive = value; + } + } - public string Encoding - { - set - { - CheckNotClosed(); - _encoding = value; - } - } + #endregion - public void Dispose() - { - Close(); - } - } + public string MimeType + { + set + { + CheckNotClosed(); + _mimeType = value; + } + } + + public string Encoding + { + set + { + CheckNotClosed(); + _encoding = value; + } + } + + public void Dispose() + { + Close(); + } + + #region Message Publishing + + private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) + { + // todo: handle session access ticket + AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame( + _channel.ChannelId, 0, exchangeName, + routingKey, mandatory, immediate + ); + + // fix message properties + if ( !_disableTimestamps ) + { + message.Timestamp = DateTime.UtcNow.Ticks; + message.Expiration = message.Timestamp + timeToLive; + } else + { + message.Expiration = 0; + } + message.DeliveryMode = deliveryMode; + message.Priority = (byte)priority; + + ByteBuffer payload = message.Data; + int payloadLength = payload.Limit; + + ContentBody[] contentBodies = CreateContentBodies(payload); + AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; + for ( int i = 0; i < contentBodies.Length; i++ ) + { + frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); + } + if ( contentBodies.Length > 0 && _logger.IsDebugEnabled ) + { + _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + // weight argument of zero indicates no child content headers, just bodies + AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame( + _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, + message.ContentHeaderProperties, (uint)payloadLength + ); + if ( _logger.IsDebugEnabled ) + { + _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + frames[0] = publishFrame; + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + + lock ( _channel.Connection.FailoverMutex ) + { + _channel.Connection.ProtocolWriter.Write(compositeFrame); + } + } + + + /// <summary> + /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated + /// maximum frame size. + /// </summary> + /// <param name="payload"></param> + /// <returns>return the array of content bodies</returns> + private ContentBody[] CreateContentBodies(ByteBuffer payload) + { + if ( payload == null ) + { + return null; + } else if ( payload.Remaining == 0 ) + { + return new ContentBody[0]; + } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1); + int frameCount = CalculateContentBodyFrames(payload); + ContentBody[] bodies = new ContentBody[frameCount]; + for ( int i = 0; i < frameCount; i++ ) + { + int length = (payload.Remaining >= framePayloadMax) + ? framePayloadMax : payload.Remaining; + bodies[i] = new ContentBody(payload, (uint)length); + } + return bodies; + } + + private int CalculateContentBodyFrames(ByteBuffer payload) + { + // we substract one from the total frame maximum size to account + // for the end of frame marker in a body frame + // (0xCE byte). + int frameCount; + if ( (payload == null) || (payload.Remaining == 0) ) + { + frameCount = 0; + } else + { + int dataLength = payload.Remaining; + int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; + frameCount = (int)(dataLength / framePayloadMax) + lastFrame; + } + + return frameCount; + } + #endregion // Message Publishing + } } diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs index 41e9d2240c..afcbd26781 100644 --- a/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs @@ -45,14 +45,12 @@ namespace Qpid.Client.Handler _logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat)); parameters.FrameMax = frame.FrameMax; - parameters.FrameMax = 65535; - //params.setChannelMax(frame.channelMax); parameters.Heartbeat = frame.Heartbeat; session.ConnectionTuneParameters = parameters; stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED); session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame( - evt.ChannelId, frame.ChannelMax, 65535, frame.Heartbeat)); + evt.ChannelId, frame.ChannelMax, frame.FrameMax, frame.Heartbeat)); session.WriteFrame(ConnectionOpenBody.CreateAMQFrame( evt.ChannelId, session.AMQConnection.VirtualHost, null, true)); diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs index 61b308696c..a7ee085a04 100644 --- a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs +++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -43,7 +43,7 @@ namespace Qpid.Client.Message if (bodies != null && bodies.Count == 1) { _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")"); - data = ByteBuffer.Wrap(((ContentBody)bodies[0]).Payload); + data = ((ContentBody)bodies[0]).Payload; } else { diff --git a/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs b/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs index 6538fcbefc..a258c82d15 100644 --- a/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs +++ b/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs @@ -28,16 +28,10 @@ namespace Qpid.Client.Message _headers.Clear(); } - public string this[string name] + public object this[string name] { - get - { - return GetString(name); - } - set - { - SetString(name, value); - } + get { return GetObject(name); } + set { SetObject(name, value); } } public bool GetBoolean(string name) @@ -167,6 +161,18 @@ namespace Qpid.Client.Message _headers.SetString(propertyName, value); } + public object GetObject(string propertyName) + { + CheckPropertyName(propertyName); + return _headers[propertyName]; + } + + public void SetObject(string propertyName, object value) + { + CheckPropertyName(propertyName); + _headers[propertyName] = value; + } + private static void CheckPropertyName(string propertyName) { if ( propertyName == null ) diff --git a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs index cb4e64718b..b64c8e1c27 100644 --- a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs +++ b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs @@ -43,7 +43,7 @@ namespace Qpid.Client.Message Bodies.Add(body); if (body.Payload != null) { - _bytesReceived += (uint)body.Payload.Length; + _bytesReceived += (uint)body.Payload.Remaining; } } diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs index ca7ffad8b3..4e4ca03322 100644 --- a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs @@ -33,35 +33,39 @@ namespace Qpid.Client.Transport // Warning: don't use this log for regular logging. static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing"); - IByteChannel byteChannel; - IProtocolEncoder encoder; - IProtocolDecoder decoder; + IByteChannel _byteChannel; + IProtocolEncoder _encoder; + IProtocolDecoder _decoder; + IProtocolDecoderOutput _decoderOutput; + private object _syncLock; - public AmqpChannel(IByteChannel byteChannel) + public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput) { - this.byteChannel = byteChannel; + _byteChannel = byteChannel; + _decoderOutput = decoderOutput; + _syncLock = new object(); AMQProtocolProvider protocolProvider = new AMQProtocolProvider(); IProtocolCodecFactory factory = protocolProvider.CodecFactory; - encoder = factory.Encoder; - decoder = factory.Decoder; + _encoder = factory.Encoder; + _decoder = factory.Decoder; } - public Queue Read() + public void Read() { - ByteBuffer buffer = byteChannel.Read(); - return DecodeAndTrace(buffer); + ByteBuffer buffer = _byteChannel.Read(); + Decode(buffer); } public IAsyncResult BeginRead(AsyncCallback callback, object state) { - return byteChannel.BeginRead(callback, state); + return _byteChannel.BeginRead(callback, state); } - public Queue EndRead(IAsyncResult result) + public void EndRead(IAsyncResult result) { - ByteBuffer buffer = byteChannel.EndRead(result); - return DecodeAndTrace(buffer); + ByteBuffer buffer = _byteChannel.EndRead(result); + Decode(buffer); } public void Write(IDataBlock o) @@ -74,43 +78,32 @@ namespace Qpid.Client.Transport // we should be doing an async write, but apparently // the mentalis library doesn't queue async read/writes // correctly and throws random IOException's. Stay sync for a while - //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); - byteChannel.Write(Encode(o)); + //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); + _byteChannel.Write(Encode(o)); } private void OnAsyncWriteDone(IAsyncResult result) { - byteChannel.EndWrite(result); + _byteChannel.EndWrite(result); } - private Queue DecodeAndTrace(ByteBuffer buffer) + private void Decode(ByteBuffer buffer) { - Queue frames = Decode(buffer); - - // TODO: Refactor to decorator. - if ( _protocolTraceLog.IsDebugEnabled ) + // make sure we don't try to decode more than + // one buffer at the same time + lock ( _syncLock ) { - foreach ( object o in frames ) - { - _protocolTraceLog.Debug(String.Format("READ {0}", o)); - } + _decoder.Decode(buffer, _decoderOutput); } - return frames; } private ByteBuffer Encode(object o) { SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); - encoder.Encode(o, output); + _encoder.Encode(o, output); return output.buffer; } - private Queue Decode(ByteBuffer byteBuffer) - { - SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput(); - decoder.Decode(byteBuffer, outx); - return outx.MessageQueue; - } } } diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs index 0379e582d6..e4d4d2ed29 100644 --- a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs @@ -25,8 +25,8 @@ namespace Qpid.Client.Transport { public interface IProtocolChannel : IProtocolWriter { - Queue Read(); + void Read(); IAsyncResult BeginRead(AsyncCallback callback, object state); - Queue EndRead(IAsyncResult result); + void EndRead(IAsyncResult result); } } diff --git a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs new file mode 100644 index 0000000000..07df62ea84 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs @@ -0,0 +1,59 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Qpid.Client.Protocol;
+using Qpid.Codec;
+using Qpid.Framing;
+using log4net;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
+ /// each <see cref="IDataBlock"/> as it is decoded to the
+ /// protocol listener
+ /// </summary>
+ internal class ProtocolDecoderOutput : IProtocolDecoderOutput
+ {
+ private IProtocolListener _protocolListener;
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+
+ public ProtocolDecoderOutput(IProtocolListener protocolListener)
+ {
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ _protocolListener = protocolListener;
+ }
+
+ public void Write(object message)
+ {
+ IDataBlock block = message as IDataBlock;
+ if ( block != null )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", block));
+ _protocolListener.OnMessage(block);
+ }
+ }
+ }
+} // namespace Qpid.Client.Transport
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs index 1fb07fb245..2895c75431 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -24,6 +24,7 @@ using System.IO; using System.Threading;
using Qpid.Client.Qms;
using Qpid.Client.Protocol;
+using Qpid.Codec;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
@@ -66,7 +67,11 @@ namespace Qpid.Client.Transport.Socket.Blocking _ioHandler = MakeBrokerConnection(broker, connection);
// todo: get default read size from config!
- _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
// post an initial async read
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
}
@@ -117,22 +122,28 @@ namespace Qpid.Client.Transport.Socket.Blocking {
try
{
- Queue frames = _amqpChannel.EndRead(result);
-
- // process results
- foreach ( IDataBlock dataBlock in frames )
- {
- _protocolListener.OnMessage(dataBlock);
- }
- // if we're not stopping, post a read again
+ _amqpChannel.EndRead(result);
+
bool stopping = _stopEvent.WaitOne(0, false);
if ( !stopping )
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
} catch ( Exception e )
{
- _protocolListener.OnException(e);
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
}
}
+
+ #region IProtocolDecoderOutput Members
+
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+
+ #endregion
}
}
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj index 2bafeb23f6..de585be7e0 100644 --- a/dotnet/Qpid.Client/Qpid.Client.csproj +++ b/dotnet/Qpid.Client/Qpid.Client.csproj @@ -110,6 +110,7 @@ <Compile Include="Client\Transport\IProtocolChannel.cs" />
<Compile Include="Client\Transport\IProtocolWriter.cs" />
<Compile Include="Client\Transport\ITransport.cs" />
+ <Compile Include="Client\Transport\ProtocolDecoderOutput.cs" />
<Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
<Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />
|
