summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-28 20:29:56 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-28 20:29:56 +0000
commit99845d14a20360de46b1cb7aa1ddf7fd45b8aa8b (patch)
tree2df629345789ada664260f6680c3a4a945519c57
parenteb4d957415a1fbe2451fdd1c7f3f18b7bc8d145f (diff)
downloadqpid-python-99845d14a20360de46b1cb7aa1ddf7fd45b8aa8b.tar.gz
Locked on FailoverMutex where necessary.
Noted that AMQConnection.CloseSession and BasicMessageConsumer.Close both lock on FailoverMutex but do ProtocolWriter.SyncWrite which probably means that they need to do the FailoverSupport thing instead. If it's a problem, it exists also in the Java client. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@480190 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AMQConnection.cs1
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqChannel.cs34
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs1
3 files changed, 25 insertions, 11 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
index ed85ec483b..a0ca8b7bcf 100644
--- a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -343,6 +343,7 @@ namespace Qpid.Client
public void CloseSession(AmqChannel channel)
{
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite).
_protocolSession.CloseSession(channel);
AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
index 5216394a26..fb4498d531 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -708,7 +708,8 @@ namespace Qpid.Client
}
/**
- * Resubscribes all producers and consumers. This is called when performing failover.
+ * Replays frame on fail over.
+ *
* @throws AMQException
*/
internal void ReplayOnFailOver()
@@ -746,15 +747,19 @@ namespace Qpid.Client
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
{
+
_logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
- queueName, exchangeName, routingKey, args));
+ queueName, exchangeName, routingKey, args));
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
routingKey, true, args);
_replayFrames.Add(queueBind);
- _connection.ProtocolWriter.Write(queueBind);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueBind);
+ }
}
private String ConsumeFromQueue(String queueName, int prefetch,
@@ -798,10 +803,7 @@ namespace Qpid.Client
AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
bool disableTimestamps)
{
- lock (Connection.FailoverMutex)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
+ DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
}
private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
@@ -854,7 +856,10 @@ namespace Qpid.Client
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+
+ lock (_connection.FailoverMutex) {
+ _connection.ProtocolWriter.Write(compositeFrame);
+ }
}
/// <summary>
@@ -934,7 +939,10 @@ namespace Qpid.Client
_replayFrames.Add(queueDeclare);
- _connection.ProtocolWriter.Write(queueDeclare);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueDeclare);
+ }
}
public void DeclareExchange(String exchangeName, String exchangeClass)
@@ -959,11 +967,15 @@ namespace Qpid.Client
if (noWait)
{
- _connection.ProtocolWriter.Write(declareExchange);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(declareExchange);
+ }
}
else
{
- _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
+ throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
+// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 6ffa8d1d6a..f0603b6e8a 100644
--- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -255,6 +255,7 @@ namespace Qpid.Client
public override void Close()
{
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
lock (_channel.Connection.FailoverMutex)
{
lock (_closingLock)