From bb320784c58b2d412c65291998d67b39cc94ee55 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 15 May 2013 11:28:43 +0000 Subject: QPID-4830 : [JMS AMQP 1.0] Improve JMS client error handling git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1482770 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/jms/impl/MessageProducerImpl.java | 24 +++++++++++++++++++++ .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 2 +- .../qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java | 4 +++- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 17 +++++++++++++++ .../org/apache/qpid/amqp_1_0/client/Sender.java | 25 ++++++++++++++++++++++ 5 files changed, 70 insertions(+), 2 deletions(-) (limited to 'java') diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 07a134f756..79c1606edb 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -34,6 +34,7 @@ import javax.jms.*; import javax.jms.IllegalStateException; import java.util.UUID; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.transport.*; public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher { @@ -87,6 +88,29 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP jmsEx.setLinkedException(e); throw jmsEx; } + _sender.setRemoteErrorListener(new Runnable() + { + @Override + public void run() + { + try + { + final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener(); + + if(exceptionListener != null) + { + final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError(); + exceptionListener.onException(new JMSException(receiverError.getDescription(), + receiverError.getCondition().getValue().toString())); + + } + } + catch (JMSException e) + { + + } + } + }); } } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index 0b175f3b27..2ae67913fe 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -117,7 +117,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession if(error != null) { exceptionListener.onException(new JMSException(error.getDescription(), - error.getCondition().toString())); + error.getCondition().getValue().toString())); } else { diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java index 8e0d07e78b..3ac70a29f2 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java @@ -34,6 +34,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic private SessionImpl _session; private final Set _consumers = Collections.synchronizedSet(new HashSet()); + private boolean _deleted; protected TemporaryTopicImpl(String address, Sender sender, SessionImpl session) { @@ -57,6 +58,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic { if(_consumers.isEmpty()) { + _deleted = true; close(); } else @@ -105,6 +107,6 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic public boolean isDeleted() { - return _sender == null; + return _deleted; } } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index a45a7f5309..596931088f 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -50,6 +50,7 @@ public class Receiver implements DeliveryStateHandler private Map _unsettledMap = new HashMap(); private MessageArrivalListener _messageArrivalListener; private org.apache.qpid.amqp_1_0.type.transport.Error _error; + private Runnable _remoteErrorTask; public Receiver(final Session session, final String linkName, @@ -125,6 +126,10 @@ public class Receiver implements DeliveryStateHandler public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) { _error = detach.getError(); + if(detach.getError()!=null) + { + remoteError(); + } super.remoteDetached(endpoint, detach); } }); @@ -171,6 +176,14 @@ public class Receiver implements DeliveryStateHandler } } + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + private void postPrefetchAction() { if(_messageArrivalListener != null) @@ -595,4 +608,8 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver); } + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 0600c18474..0feaa48805 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -48,6 +48,7 @@ public class Sender implements DeliveryStateHandler private Map _outcomeActions = Collections.synchronizedMap(new HashMap()); private boolean _closed; private Error _error; + private Runnable _remoteErrorTask; public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) throws SenderCreationException, ConnectionClosedException @@ -178,6 +179,10 @@ public class Sender implements DeliveryStateHandler public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) { _error = detach.getError(); + if(_error != null) + { + remoteError(); + } super.remoteDetached(endpoint, detach); } }); @@ -398,6 +403,26 @@ public class Sender implements DeliveryStateHandler return _session; } + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } + + public Error getError() + { + return _error; + } + public class SenderCreationException extends Exception { public SenderCreationException(Throwable e) -- cgit v1.2.1