summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-28 20:29:56 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-28 20:29:56 +0000
commitc4fac88720bbfbaacbd821596d46e8715a161639 (patch)
treede07d81c5be72af718b637bd8095f55ae4e3cee5
parenta204a5602d5c8569a3046d424d7d3dc506fcec22 (diff)
downloadqpid-python-c4fac88720bbfbaacbd821596d46e8715a161639.tar.gz
Locked on FailoverMutex where necessary.
Noted that AMQConnection.CloseSession and BasicMessageConsumer.Close both lock on FailoverMutex but do ProtocolWriter.SyncWrite which probably means that they need to do the FailoverSupport thing instead. If it's a problem, it exists also in the Java client. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480190 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs1
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs34
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs1
3 files changed, 25 insertions, 11 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
index ed85ec483b..a0ca8b7bcf 100644
--- a/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -343,6 +343,7 @@ namespace Qpid.Client
public void CloseSession(AmqChannel channel)
{
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite).
_protocolSession.CloseSession(channel);
AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 5216394a26..fb4498d531 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -708,7 +708,8 @@ namespace Qpid.Client
}
/**
- * Resubscribes all producers and consumers. This is called when performing failover.
+ * Replays frame on fail over.
+ *
* @throws AMQException
*/
internal void ReplayOnFailOver()
@@ -746,15 +747,19 @@ namespace Qpid.Client
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
{
+
_logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
- queueName, exchangeName, routingKey, args));
+ queueName, exchangeName, routingKey, args));
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
routingKey, true, args);
_replayFrames.Add(queueBind);
- _connection.ProtocolWriter.Write(queueBind);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueBind);
+ }
}
private String ConsumeFromQueue(String queueName, int prefetch,
@@ -798,10 +803,7 @@ namespace Qpid.Client
AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
bool disableTimestamps)
{
- lock (Connection.FailoverMutex)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
+ DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
}
private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
@@ -854,7 +856,10 @@ namespace Qpid.Client
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+
+ lock (_connection.FailoverMutex) {
+ _connection.ProtocolWriter.Write(compositeFrame);
+ }
}
/// <summary>
@@ -934,7 +939,10 @@ namespace Qpid.Client
_replayFrames.Add(queueDeclare);
- _connection.ProtocolWriter.Write(queueDeclare);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueDeclare);
+ }
}
public void DeclareExchange(String exchangeName, String exchangeClass)
@@ -959,11 +967,15 @@ namespace Qpid.Client
if (noWait)
{
- _connection.ProtocolWriter.Write(declareExchange);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(declareExchange);
+ }
}
else
{
- _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
+ throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
+// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
}
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 6ffa8d1d6a..f0603b6e8a 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -255,6 +255,7 @@ namespace Qpid.Client
public override void Close()
{
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
lock (_channel.Connection.FailoverMutex)
{
lock (_closingLock)