diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-28 20:29:56 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-28 20:29:56 +0000 |
| commit | c4fac88720bbfbaacbd821596d46e8715a161639 (patch) | |
| tree | de07d81c5be72af718b637bd8095f55ae4e3cee5 | |
| parent | a204a5602d5c8569a3046d424d7d3dc506fcec22 (diff) | |
| download | qpid-python-c4fac88720bbfbaacbd821596d46e8715a161639.tar.gz | |
Locked on FailoverMutex where necessary.
Noted that AMQConnection.CloseSession and BasicMessageConsumer.Close both lock on FailoverMutex but do ProtocolWriter.SyncWrite which probably means that they need to do the FailoverSupport thing instead. If it's a problem, it exists also in the Java client.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480190 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | dotnet/Qpid.Client/Client/AMQConnection.cs | 1 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 34 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 1 |
3 files changed, 25 insertions, 11 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index ed85ec483b..a0ca8b7bcf 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -343,6 +343,7 @@ namespace Qpid.Client public void CloseSession(AmqChannel channel) { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). _protocolSession.CloseSession(channel); AMQFrame frame = ChannelCloseBody.CreateAMQFrame( diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 5216394a26..fb4498d531 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -708,7 +708,8 @@ namespace Qpid.Client } /** - * Resubscribes all producers and consumers. This is called when performing failover. + * Replays frame on fail over. + * * @throws AMQException */ internal void ReplayOnFailOver() @@ -746,15 +747,19 @@ namespace Qpid.Client internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) { + _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", - queueName, exchangeName, routingKey, args)); + queueName, exchangeName, routingKey, args)); AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, routingKey, true, args); _replayFrames.Add(queueBind); - _connection.ProtocolWriter.Write(queueBind); + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueBind); + } } private String ConsumeFromQueue(String queueName, int prefetch, @@ -798,10 +803,7 @@ namespace Qpid.Client AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool disableTimestamps) { - lock (Connection.FailoverMutex) - { - DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); - } + DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); } private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps) @@ -854,7 +856,10 @@ namespace Qpid.Client frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame); + + lock (_connection.FailoverMutex) { + _connection.ProtocolWriter.Write(compositeFrame); + } } /// <summary> @@ -934,7 +939,10 @@ namespace Qpid.Client _replayFrames.Add(queueDeclare); - _connection.ProtocolWriter.Write(queueDeclare); + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueDeclare); + } } public void DeclareExchange(String exchangeName, String exchangeClass) @@ -959,11 +967,15 @@ namespace Qpid.Client if (noWait) { - _connection.ProtocolWriter.Write(declareExchange); + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(declareExchange); + } } else { - _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); +// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } } diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 6ffa8d1d6a..f0603b6e8a 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -255,6 +255,7 @@ namespace Qpid.Client public override void Close() { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex lock (_channel.Connection.FailoverMutex) { lock (_closingLock) |
