diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/AMQConnection.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/AMQConnection.cs | 67 |
1 files changed, 40 insertions, 27 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index 4498ba3a32..d74cf6b5e4 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -24,17 +24,17 @@ using System.IO; using System.Reflection; using System.Threading; using log4net; -using Qpid.Client.Failover; -using Qpid.Client.Protocol; -using Qpid.Client.Qms; -using Qpid.Client.State; -using Qpid.Client.Transport; -using Qpid.Client.Transport.Socket.Blocking; -using Qpid.Collections; -using Qpid.Framing; -using Qpid.Messaging; - -namespace Qpid.Client +using Apache.Qpid.Client.Failover; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client.State; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Client.Transport.Socket.Blocking; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client { public class AMQConnection : Closeable, IConnection { @@ -273,15 +273,17 @@ namespace Qpid.Client private bool _transacted; private AcknowledgeMode _acknowledgeMode; - int _prefetch; + int _prefetchHigh; + int _prefetchLow; AMQConnection _connection; - public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) + public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) { _connection = connection; _transacted = transacted; _acknowledgeMode = acknowledgeMode; - _prefetch = prefetch; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; } protected override object operation() @@ -297,14 +299,14 @@ namespace Qpid.Client // open it, so that there is no window where we could receive data on the channel and not be set // up to handle it appropriately. AmqChannel channel = new AmqChannel(_connection, - channelId, _transacted, _acknowledgeMode, _prefetch); + channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow); _connection.ProtocolSession.AddSessionByChannel(channelId, channel); _connection.RegisterSession(channelId, channel); bool success = false; try { - _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted); + _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted); success = true; } catch (AMQException e) @@ -334,11 +336,16 @@ namespace Qpid.Client public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) { - return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH); + return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK); } public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) { + return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) + { CheckNotClosed(); if (ChannelLimitReached()) { @@ -347,7 +354,7 @@ namespace Qpid.Client else { CreateChannelFailoverSupport operation = - new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch); + new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow); return (IChannel)operation.execute(this); } } @@ -704,8 +711,8 @@ namespace Qpid.Client /* // Currently there is only one transport option - BlockingSocket. - String assemblyName = "Qpid.Client.Transport.Socket.Blocking.dll"; - String transportType = "Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; + String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll"; + String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; // Load the transport assembly dynamically. _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); @@ -774,18 +781,23 @@ namespace Qpid.Client foreach (AmqChannel channel in channels) { _protocolSession.AddSessionByChannel(channel.ChannelId, channel); - ReopenChannel(channel.ChannelId, (ushort)channel.DefaultPrefetch, channel.Transacted); + ReopenChannel( + channel.ChannelId, + channel.DefaultPrefetchHigh, + channel.DefaultPrefetchLow, + channel.Transacted + ); channel.ReplayOnFailOver(); } } - private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted) + private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) { - _log.Debug(string.Format("Reopening channel id={0} prefetch={1} transacted={2}", - channelId, prefetch, transacted)); + _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}", + channelId, prefetchHigh, prefetchLow, transacted)); try { - createChannelOverWire(channelId, prefetch, transacted); + CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); } catch (AMQException e) { @@ -795,7 +807,7 @@ namespace Qpid.Client } } - void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted) + void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) { _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); @@ -805,7 +817,8 @@ namespace Qpid.Client { // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. _protocolWriter.SyncWrite( - BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false), + BasicQosBody.CreateAMQFrame( + channelId, (uint)prefetchHigh, 0, false), typeof (BasicQosOkBody)); } |