summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client
diff options
context:
space:
mode:
authorTomas Restrepo <tomasr@apache.org>2007-05-10 22:25:01 +0000
committerTomas Restrepo <tomasr@apache.org>2007-05-10 22:25:01 +0000
commitbb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f (patch)
treeceedfed77e541625e0460e8a6577d334ecffa43e /dotnet/Qpid.Client
parent0e528b07a48edcb69d5833d1dd90f12f70403fa3 (diff)
downloadqpid-python-bb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f.tar.gz
QPID-441 Fix handling of bounced messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@537019 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs48
-rw-r--r--dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs15
-rw-r--r--dotnet/Qpid.Client/Qpid.Client.csproj4
4 files changed, 62 insertions, 7 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 07650c170b..3471ac3640 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -28,6 +28,7 @@ using Qpid.Client.Message;
using Qpid.Collections;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Protocol;
namespace Qpid.Client
{
@@ -568,8 +569,14 @@ namespace Qpid.Client
if (_logger.IsDebugEnabled)
{
_logger.Debug("Message received in session with channel id " + _channelId);
- }
- _queue.EnqueueBlocking(message);
+ }
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ } else
+ {
+ _queue.EnqueueBlocking(message);
+ }
}
public int DefaultPrefetch
@@ -986,5 +993,42 @@ namespace Qpid.Client
// FIXME: lock FailoverMutex here?
_connection.ProtocolWriter.Write(ackFrame);
}
+
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(
+ 0, false, message.ContentHeader,
+ message.Bodies
+ );
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQException exception;
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ {
+ exception = new AMQNoConsumersException(reason, bouncedMessage);
+ } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ } else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+ }
+ _connection.ExceptionReceived(exception);
+ } catch ( Exception ex )
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
+ }
+
+ }
}
}
diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
index 78526f906f..0bd65a1ace 100644
--- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
@@ -32,7 +32,7 @@ namespace Qpid.Client.Handler
public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
{
- _logger.Debug("New JmsBounce method received");
+ _logger.Debug("New Basic.Return method received");
UnprocessedMessage msg = new UnprocessedMessage();
msg.DeliverBody = null;
msg.BounceBody = (BasicReturnBody) evt.Method;
diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
index 0ce8a393c9..7f88dd8219 100644
--- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
@@ -44,11 +44,20 @@ namespace Qpid.Client.Handler
AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
evt.ProtocolSession.WriteFrame(frame);
- // HACK
+
if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
{
- _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
- evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason));
+ _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ throw new AMQNoConsumersException(reason);
+ if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ throw new AMQNoRouteException(reason);
+ if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+ throw new AMQInvalidArgumentException(reason);
+ if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+ throw new AMQInvalidRoutingKeyException(reason);
+ // any other
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
}
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj
index 19d2180a09..31cd5e03ae 100644
--- a/dotnet/Qpid.Client/Qpid.Client.csproj
+++ b/dotnet/Qpid.Client/Qpid.Client.csproj
@@ -43,6 +43,8 @@
<Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -144,4 +146,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project> \ No newline at end of file