summaryrefslogtreecommitdiff
path: root/dotnet
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet')
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs1
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs34
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs1
3 files changed, 25 insertions, 11 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
index ed85ec483b..a0ca8b7bcf 100644
--- a/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -343,6 +343,7 @@ namespace Qpid.Client
public void CloseSession(AmqChannel channel)
{
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite).
_protocolSession.CloseSession(channel);
AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 5216394a26..fb4498d531 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -708,7 +708,8 @@ namespace Qpid.Client
}
/**
- * Resubscribes all producers and consumers. This is called when performing failover.
+ * Replays frame on fail over.
+ *
* @throws AMQException
*/
internal void ReplayOnFailOver()
@@ -746,15 +747,19 @@ namespace Qpid.Client
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
{
+
_logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
- queueName, exchangeName, routingKey, args));
+ queueName, exchangeName, routingKey, args));
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
routingKey, true, args);
_replayFrames.Add(queueBind);
- _connection.ProtocolWriter.Write(queueBind);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueBind);
+ }
}
private String ConsumeFromQueue(String queueName, int prefetch,
@@ -798,10 +803,7 @@ namespace Qpid.Client
AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
bool disableTimestamps)
{
- lock (Connection.FailoverMutex)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
+ DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
}
private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
@@ -854,7 +856,10 @@ namespace Qpid.Client
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+
+ lock (_connection.FailoverMutex) {
+ _connection.ProtocolWriter.Write(compositeFrame);
+ }
}
/// <summary>
@@ -934,7 +939,10 @@ namespace Qpid.Client
_replayFrames.Add(queueDeclare);
- _connection.ProtocolWriter.Write(queueDeclare);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueDeclare);
+ }
}
public void DeclareExchange(String exchangeName, String exchangeClass)
@@ -959,11 +967,15 @@ namespace Qpid.Client
if (noWait)
{
- _connection.ProtocolWriter.Write(declareExchange);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(declareExchange);
+ }
}
else
{
- _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
+ throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
+// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
}
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 6ffa8d1d6a..f0603b6e8a 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -255,6 +255,7 @@ namespace Qpid.Client
public override void Close()
{
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
lock (_channel.Connection.FailoverMutex)
{
lock (_closingLock)