diff options
Diffstat (limited to 'dotnet/Qpid.Client')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AMQConnection.cs | 1 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 34 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 1 |
3 files changed, 25 insertions, 11 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index ed85ec483b..a0ca8b7bcf 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -343,6 +343,7 @@ namespace Qpid.Client public void CloseSession(AmqChannel channel) { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). _protocolSession.CloseSession(channel); AMQFrame frame = ChannelCloseBody.CreateAMQFrame( diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 5216394a26..fb4498d531 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -708,7 +708,8 @@ namespace Qpid.Client } /** - * Resubscribes all producers and consumers. This is called when performing failover. + * Replays frame on fail over. + * * @throws AMQException */ internal void ReplayOnFailOver() @@ -746,15 +747,19 @@ namespace Qpid.Client internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) { + _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", - queueName, exchangeName, routingKey, args)); + queueName, exchangeName, routingKey, args)); AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, routingKey, true, args); _replayFrames.Add(queueBind); - _connection.ProtocolWriter.Write(queueBind); + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueBind); + } } private String ConsumeFromQueue(String queueName, int prefetch, @@ -798,10 +803,7 @@ namespace Qpid.Client AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool disableTimestamps) { - lock (Connection.FailoverMutex) - { - DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); - } + DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); } private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps) @@ -854,7 +856,10 @@ namespace Qpid.Client frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame); + + lock (_connection.FailoverMutex) { + _connection.ProtocolWriter.Write(compositeFrame); + } } /// <summary> @@ -934,7 +939,10 @@ namespace Qpid.Client _replayFrames.Add(queueDeclare); - _connection.ProtocolWriter.Write(queueDeclare); + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueDeclare); + } } public void DeclareExchange(String exchangeName, String exchangeClass) @@ -959,11 +967,15 @@ namespace Qpid.Client if (noWait) { - _connection.ProtocolWriter.Write(declareExchange); + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(declareExchange); + } } else { - _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); +// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } } diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 6ffa8d1d6a..f0603b6e8a 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -255,6 +255,7 @@ namespace Qpid.Client public override void Close() { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex lock (_channel.Connection.FailoverMutex) { lock (_closingLock) |
