diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-05-09 13:53:51 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-05-09 13:53:51 +0000 |
| commit | 4cf2baa090c723b97a486bdb582ce07ff3a6c190 (patch) | |
| tree | 6fc914c48a396cdc9cd6aad90736d3c760f58271 /dotnet/Qpid.Client | |
| parent | f66b4596946bbd8df7e6afbfd421197fe9856f32 (diff) | |
| download | qpid-python-4cf2baa090c723b97a486bdb582ce07ff3a6c190.tar.gz | |
Merged revisions 652388-653415,653417-654109 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x
........
r652388 | ritchiem | 2008-04-30 15:40:18 +0100 (Wed, 30 Apr 2008) | 2 lines
QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads.
........
r652389 | ritchiem | 2008-04-30 15:40:45 +0100 (Wed, 30 Apr 2008) | 1 line
QPID-887 : Renamed QueueHouseKeeping threads so they can be identified in thread dump. Named Queue-housekeeping-<virtualhost name>
........
r652399 | ritchiem | 2008-04-30 16:32:42 +0100 (Wed, 30 Apr 2008) | 1 line
QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired.
........
r652567 | aidan | 2008-05-01 17:32:20 +0100 (Thu, 01 May 2008) | 1 line
QPID-994 Dont wait for attain state as connection is closed by we get CloseOk
........
r652568 | aidan | 2008-05-01 17:35:09 +0100 (Thu, 01 May 2008) | 1 line
QPID-1001 dont set the expiration time if TTL is 0
........
r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line
Check if consumer is closed and dont reclose it
........
r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line
QPID-1022 Use synchronous writes to fix race conditions
........
r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line
QPID-1023 increase some timeouts
........
r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines
QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness.
........
r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines
QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout.
........
r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line
QPID-952 should have been part of previous commit
........
r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines
QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@654818 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AMQConnection.cs | 37 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 42 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 6 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageProducer.cs | 5 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs | 14 |
5 files changed, 47 insertions, 57 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index d0bebf1170..41d4e089b6 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -69,7 +69,7 @@ namespace Apache.Qpid.Client internal bool IsFailoverAllowed { - get { return _failoverPolicy.FailoverAllowed(); } + get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); } } /// <summary> @@ -151,34 +151,22 @@ namespace Apache.Qpid.Client _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); // XXX: Should perhaps break out of the do/while here if not a SocketException... } - } while (_failoverPolicy.FailoverAllowed()); + } while (!_connected && _failoverPolicy.FailoverAllowed()); _log.Debug("Are we connected:" + _connected); - - if (!_failoverPolicy.FailoverAllowed()) - { - if ( lastException is AMQException ) - throw lastException; - else - throw new AMQConnectionException("Unable to connect", lastException); - } - // TODO: this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !Closed) + if (!_connected) { - _log.Debug("Sleeping."); - Thread.Sleep(100); - } - if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } + if ( lastException is AMQException ) + { + throw lastException; + } + else + { + throw new AMQConnectionException("Unable to connect", lastException); + } } + } /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) @@ -263,7 +251,6 @@ namespace Apache.Qpid.Client _log.Debug("Blocking for connection close ok frame"); - _stateManager.AttainState(AMQState.CONNECTION_CLOSED); Disconnect(); } diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index ce8e2ca2fe..86dc9a4681 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -888,10 +888,14 @@ namespace Apache.Qpid.Client /// <param name="consumer"></param> private void RegisterConsumer(BasicMessageConsumer consumer) { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + consumer.ConsumerTag = tag; + _consumers.Add(tag, consumer); + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; - _consumers.Add(consumerTag, consumer); + consumer.Exclusive, consumer.AcknowledgeMode, tag); + } internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) @@ -902,19 +906,21 @@ namespace Apache.Qpid.Client AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, - routingKey, true, args); - _replayFrames.Add(queueBind); + routingKey, false, args); + lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueBind); + _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args)); } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) { - // Need to generate a consumer tag on the client so we can exploit the nowait flag. - String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, @@ -934,9 +940,7 @@ namespace Apache.Qpid.Client _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); - - _replayFrames.Add(queueDelete); - + if (noWait) { _connection.ProtocolWriter.Write(queueDelete); @@ -945,6 +949,8 @@ namespace Apache.Qpid.Client { _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true)); } catch (AMQException) { @@ -958,14 +964,16 @@ namespace Apache.Qpid.Client queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, true, null); + isAutoDelete, false, null); - _replayFrames.Add(queueDeclare); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueDeclare); + _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, + isAutoDelete, true, null)); } // AMQP-level method. @@ -978,8 +986,6 @@ namespace Apache.Qpid.Client AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); - - _replayFrames.Add(declareExchange); if (noWait) { @@ -987,6 +993,8 @@ namespace Apache.Qpid.Client { _connection.ProtocolWriter.Write(declareExchange); } + // AS FIXME: wasnae me + _replayFrames.Add(declareExchange); } else { diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index e88cf8f04c..6fee316cb4 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -266,7 +266,11 @@ namespace Apache.Qpid.Client public override void Close() { - // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex + if (_closed == CLOSED) + { + return; + } + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex lock (_channel.Connection.FailoverMutex) { lock (_closingLock) diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs index ca6d2abee5..f33afc452e 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs @@ -306,7 +306,10 @@ namespace Apache.Qpid.Client if ( !_disableTimestamps ) { message.Timestamp = DateTime.UtcNow.Ticks; - message.Expiration = message.Timestamp + timeToLive; + if (timeToLive != 0) + { + message.Expiration = message.Timestamp + timeToLive; + } } else { message.Expiration = 0; diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs index 7ae086e35f..1fb3d407eb 100644 --- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -35,12 +35,6 @@ namespace Apache.Qpid.Client.Protocol private readonly IProtocolWriter _protocolWriter; private readonly IConnectionCloser _connectionCloser; - /** - * Counter to ensure unique queue names - */ - private int _queueId = 1; - private readonly Object _queueIdLock = new Object(); - /// <summary> /// Maps from the channel id to the AmqChannel that it represents. /// </summary> @@ -267,13 +261,7 @@ namespace Apache.Qpid.Client.Protocol internal string GenerateQueueName() { - int id; - lock(_queueIdLock) - { - id = _queueId++; - } - - return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; + return "ntmp_" + System.Guid.NewGuid(); } } } |
