diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Transport')
4 files changed, 109 insertions, 46 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs index ca7ffad8b3..4e4ca03322 100644 --- a/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs +++ b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs @@ -33,35 +33,39 @@ namespace Qpid.Client.Transport // Warning: don't use this log for regular logging. static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing"); - IByteChannel byteChannel; - IProtocolEncoder encoder; - IProtocolDecoder decoder; + IByteChannel _byteChannel; + IProtocolEncoder _encoder; + IProtocolDecoder _decoder; + IProtocolDecoderOutput _decoderOutput; + private object _syncLock; - public AmqpChannel(IByteChannel byteChannel) + public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput) { - this.byteChannel = byteChannel; + _byteChannel = byteChannel; + _decoderOutput = decoderOutput; + _syncLock = new object(); AMQProtocolProvider protocolProvider = new AMQProtocolProvider(); IProtocolCodecFactory factory = protocolProvider.CodecFactory; - encoder = factory.Encoder; - decoder = factory.Decoder; + _encoder = factory.Encoder; + _decoder = factory.Decoder; } - public Queue Read() + public void Read() { - ByteBuffer buffer = byteChannel.Read(); - return DecodeAndTrace(buffer); + ByteBuffer buffer = _byteChannel.Read(); + Decode(buffer); } public IAsyncResult BeginRead(AsyncCallback callback, object state) { - return byteChannel.BeginRead(callback, state); + return _byteChannel.BeginRead(callback, state); } - public Queue EndRead(IAsyncResult result) + public void EndRead(IAsyncResult result) { - ByteBuffer buffer = byteChannel.EndRead(result); - return DecodeAndTrace(buffer); + ByteBuffer buffer = _byteChannel.EndRead(result); + Decode(buffer); } public void Write(IDataBlock o) @@ -74,43 +78,32 @@ namespace Qpid.Client.Transport // we should be doing an async write, but apparently // the mentalis library doesn't queue async read/writes // correctly and throws random IOException's. Stay sync for a while - //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); - byteChannel.Write(Encode(o)); + //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); + _byteChannel.Write(Encode(o)); } private void OnAsyncWriteDone(IAsyncResult result) { - byteChannel.EndWrite(result); + _byteChannel.EndWrite(result); } - private Queue DecodeAndTrace(ByteBuffer buffer) + private void Decode(ByteBuffer buffer) { - Queue frames = Decode(buffer); - - // TODO: Refactor to decorator. - if ( _protocolTraceLog.IsDebugEnabled ) + // make sure we don't try to decode more than + // one buffer at the same time + lock ( _syncLock ) { - foreach ( object o in frames ) - { - _protocolTraceLog.Debug(String.Format("READ {0}", o)); - } + _decoder.Decode(buffer, _decoderOutput); } - return frames; } private ByteBuffer Encode(object o) { SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); - encoder.Encode(o, output); + _encoder.Encode(o, output); return output.buffer; } - private Queue Decode(ByteBuffer byteBuffer) - { - SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput(); - decoder.Decode(byteBuffer, outx); - return outx.MessageQueue; - } } } diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs index 0379e582d6..e4d4d2ed29 100644 --- a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs +++ b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs @@ -25,8 +25,8 @@ namespace Qpid.Client.Transport { public interface IProtocolChannel : IProtocolWriter { - Queue Read(); + void Read(); IAsyncResult BeginRead(AsyncCallback callback, object state); - Queue EndRead(IAsyncResult result); + void EndRead(IAsyncResult result); } } diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs new file mode 100644 index 0000000000..07df62ea84 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs @@ -0,0 +1,59 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Qpid.Client.Protocol;
+using Qpid.Codec;
+using Qpid.Framing;
+using log4net;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
+ /// each <see cref="IDataBlock"/> as it is decoded to the
+ /// protocol listener
+ /// </summary>
+ internal class ProtocolDecoderOutput : IProtocolDecoderOutput
+ {
+ private IProtocolListener _protocolListener;
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+
+ public ProtocolDecoderOutput(IProtocolListener protocolListener)
+ {
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ _protocolListener = protocolListener;
+ }
+
+ public void Write(object message)
+ {
+ IDataBlock block = message as IDataBlock;
+ if ( block != null )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", block));
+ _protocolListener.OnMessage(block);
+ }
+ }
+ }
+} // namespace Qpid.Client.Transport
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs index 1fb07fb245..2895c75431 100644 --- a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -24,6 +24,7 @@ using System.IO; using System.Threading;
using Qpid.Client.Qms;
using Qpid.Client.Protocol;
+using Qpid.Codec;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
@@ -66,7 +67,11 @@ namespace Qpid.Client.Transport.Socket.Blocking _ioHandler = MakeBrokerConnection(broker, connection);
// todo: get default read size from config!
- _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
// post an initial async read
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
}
@@ -117,22 +122,28 @@ namespace Qpid.Client.Transport.Socket.Blocking {
try
{
- Queue frames = _amqpChannel.EndRead(result);
-
- // process results
- foreach ( IDataBlock dataBlock in frames )
- {
- _protocolListener.OnMessage(dataBlock);
- }
- // if we're not stopping, post a read again
+ _amqpChannel.EndRead(result);
+
bool stopping = _stopEvent.WaitOne(0, false);
if ( !stopping )
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
} catch ( Exception e )
{
- _protocolListener.OnException(e);
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
}
}
+
+ #region IProtocolDecoderOutput Members
+
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+
+ #endregion
}
}
|
