summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/AMQConnection.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/AMQConnection.cs')
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs67
1 files changed, 40 insertions, 27 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
index 4498ba3a32..d74cf6b5e4 100644
--- a/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -24,17 +24,17 @@ using System.IO;
using System.Reflection;
using System.Threading;
using log4net;
-using Qpid.Client.Failover;
-using Qpid.Client.Protocol;
-using Qpid.Client.Qms;
-using Qpid.Client.State;
-using Qpid.Client.Transport;
-using Qpid.Client.Transport.Socket.Blocking;
-using Qpid.Collections;
-using Qpid.Framing;
-using Qpid.Messaging;
-
-namespace Qpid.Client
+using Apache.Qpid.Client.Failover;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Client.Transport;
+using Apache.Qpid.Client.Transport.Socket.Blocking;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Client
{
public class AMQConnection : Closeable, IConnection
{
@@ -273,15 +273,17 @@ namespace Qpid.Client
private bool _transacted;
private AcknowledgeMode _acknowledgeMode;
- int _prefetch;
+ int _prefetchHigh;
+ int _prefetchLow;
AMQConnection _connection;
- public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
+ public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
{
_connection = connection;
_transacted = transacted;
_acknowledgeMode = acknowledgeMode;
- _prefetch = prefetch;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
}
protected override object operation()
@@ -297,14 +299,14 @@ namespace Qpid.Client
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
AmqChannel channel = new AmqChannel(_connection,
- channelId, _transacted, _acknowledgeMode, _prefetch);
+ channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow);
_connection.ProtocolSession.AddSessionByChannel(channelId, channel);
_connection.RegisterSession(channelId, channel);
bool success = false;
try
{
- _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted);
+ _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted);
success = true;
}
catch (AMQException e)
@@ -334,11 +336,16 @@ namespace Qpid.Client
public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode)
{
- return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH);
+ return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK);
}
public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
{
+ return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
+ {
CheckNotClosed();
if (ChannelLimitReached())
{
@@ -347,7 +354,7 @@ namespace Qpid.Client
else
{
CreateChannelFailoverSupport operation =
- new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch);
+ new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
return (IChannel)operation.execute(this);
}
}
@@ -704,8 +711,8 @@ namespace Qpid.Client
/*
// Currently there is only one transport option - BlockingSocket.
- String assemblyName = "Qpid.Client.Transport.Socket.Blocking.dll";
- String transportType = "Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport";
+ String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll";
+ String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport";
// Load the transport assembly dynamically.
_transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType);
@@ -774,18 +781,23 @@ namespace Qpid.Client
foreach (AmqChannel channel in channels)
{
_protocolSession.AddSessionByChannel(channel.ChannelId, channel);
- ReopenChannel(channel.ChannelId, (ushort)channel.DefaultPrefetch, channel.Transacted);
+ ReopenChannel(
+ channel.ChannelId,
+ channel.DefaultPrefetchHigh,
+ channel.DefaultPrefetchLow,
+ channel.Transacted
+ );
channel.ReplayOnFailOver();
}
}
- private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted)
+ private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
{
- _log.Debug(string.Format("Reopening channel id={0} prefetch={1} transacted={2}",
- channelId, prefetch, transacted));
+ _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}",
+ channelId, prefetchHigh, prefetchLow, transacted));
try
{
- createChannelOverWire(channelId, prefetch, transacted);
+ CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
}
catch (AMQException e)
{
@@ -795,7 +807,7 @@ namespace Qpid.Client
}
}
- void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted)
+ void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
{
_protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
@@ -805,7 +817,8 @@ namespace Qpid.Client
{
// Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
_protocolWriter.SyncWrite(
- BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false),
+ BasicQosBody.CreateAMQFrame(
+ channelId, (uint)prefetchHigh, 0, false),
typeof (BasicQosOkBody));
}