diff options
| author | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 22:25:01 +0000 |
|---|---|---|
| committer | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 22:25:01 +0000 |
| commit | bb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f (patch) | |
| tree | ceedfed77e541625e0460e8a6577d334ecffa43e /dotnet/Qpid.Client | |
| parent | 0e528b07a48edcb69d5833d1dd90f12f70403fa3 (diff) | |
| download | qpid-python-bb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f.tar.gz | |
QPID-441 Fix handling of bounced messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@537019 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client')
4 files changed, 62 insertions, 7 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 07650c170b..3471ac3640 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -28,6 +28,7 @@ using Qpid.Client.Message; using Qpid.Collections; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Protocol; namespace Qpid.Client { @@ -568,8 +569,14 @@ namespace Qpid.Client if (_logger.IsDebugEnabled) { _logger.Debug("Message received in session with channel id " + _channelId); - } - _queue.EnqueueBlocking(message); + } + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } else + { + _queue.EnqueueBlocking(message); + } } public int DefaultPrefetch @@ -986,5 +993,42 @@ namespace Qpid.Client // FIXME: lock FailoverMutex here? _connection.ProtocolWriter.Write(ackFrame); } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage( + 0, false, message.ContentHeader, + message.Bodies + ); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + _connection.ExceptionReceived(exception); + } catch ( Exception ex ) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + + } } } diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs index 78526f906f..0bd65a1ace 100644 --- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs @@ -32,7 +32,7 @@ namespace Qpid.Client.Handler public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) { - _logger.Debug("New JmsBounce method received"); + _logger.Debug("New Basic.Return method received"); UnprocessedMessage msg = new UnprocessedMessage(); msg.DeliverBody = null; msg.BounceBody = (BasicReturnBody) evt.Method; diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs index 0ce8a393c9..7f88dd8219 100644 --- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs @@ -44,11 +44,20 @@ namespace Qpid.Client.Handler AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId); evt.ProtocolSession.WriteFrame(frame); - // HACK + if ( errorCode != AMQConstant.REPLY_SUCCESS.Code ) { - _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); - evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason)); + _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + throw new AMQNoConsumersException(reason); + if ( errorCode == AMQConstant.NO_ROUTE.Code ) + throw new AMQNoRouteException(reason); + if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code ) + throw new AMQInvalidArgumentException(reason); + if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code ) + throw new AMQInvalidRoutingKeyException(reason); + // any other + throw new AMQChannelClosedException(errorCode, "Error: " + reason); } evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason); } diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj index 19d2180a09..31cd5e03ae 100644 --- a/dotnet/Qpid.Client/Qpid.Client.csproj +++ b/dotnet/Qpid.Client/Qpid.Client.csproj @@ -43,6 +43,8 @@ <Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -144,4 +146,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file |
