diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-05-15 11:28:43 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-05-15 11:28:43 +0000 |
| commit | bb320784c58b2d412c65291998d67b39cc94ee55 (patch) | |
| tree | 1bb199913b901f4c95eac1d8aecd2af14e65b743 /java | |
| parent | fb1b1d90c69da2100496a9882ed610fb105e77a7 (diff) | |
| download | qpid-python-bb320784c58b2d412c65291998d67b39cc94ee55.tar.gz | |
QPID-4830 : [JMS AMQP 1.0] Improve JMS client error handling
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1482770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 70 insertions, 2 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 07a134f756..79c1606edb 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -34,6 +34,7 @@ import javax.jms.*; import javax.jms.IllegalStateException;
import java.util.UUID;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.transport.*;
public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
@@ -87,6 +88,29 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP jmsEx.setLinkedException(e);
throw jmsEx;
}
+ _sender.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
}
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index 0b175f3b27..2ae67913fe 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -117,7 +117,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession if(error != null)
{
exceptionListener.onException(new JMSException(error.getDescription(),
- error.getCondition().toString()));
+ error.getCondition().getValue().toString()));
}
else
{
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java index 8e0d07e78b..3ac70a29f2 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java @@ -34,6 +34,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic private SessionImpl _session;
private final Set<MessageConsumer> _consumers =
Collections.synchronizedSet(new HashSet<MessageConsumer>());
+ private boolean _deleted;
protected TemporaryTopicImpl(String address, Sender sender, SessionImpl session)
{
@@ -57,6 +58,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic {
if(_consumers.isEmpty())
{
+ _deleted = true;
close();
}
else
@@ -105,6 +107,6 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic public boolean isDeleted()
{
- return _sender == null;
+ return _deleted;
}
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index a45a7f5309..596931088f 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -50,6 +50,7 @@ public class Receiver implements DeliveryStateHandler private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
private MessageArrivalListener _messageArrivalListener;
private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+ private Runnable _remoteErrorTask;
public Receiver(final Session session,
final String linkName,
@@ -125,6 +126,10 @@ public class Receiver implements DeliveryStateHandler public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
{
_error = detach.getError();
+ if(detach.getError()!=null)
+ {
+ remoteError();
+ }
super.remoteDetached(endpoint, detach);
}
});
@@ -171,6 +176,14 @@ public class Receiver implements DeliveryStateHandler }
}
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
private void postPrefetchAction()
{
if(_messageArrivalListener != null)
@@ -595,4 +608,8 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver);
}
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 0600c18474..0feaa48805 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -48,6 +48,7 @@ public class Sender implements DeliveryStateHandler private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
private boolean _closed;
private Error _error;
+ private Runnable _remoteErrorTask;
public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
throws SenderCreationException, ConnectionClosedException
@@ -178,6 +179,10 @@ public class Sender implements DeliveryStateHandler public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
{
_error = detach.getError();
+ if(_error != null)
+ {
+ remoteError();
+ }
super.remoteDetached(endpoint, detach);
}
});
@@ -398,6 +403,26 @@ public class Sender implements DeliveryStateHandler return _session;
}
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
public class SenderCreationException extends Exception
{
public SenderCreationException(Throwable e)
|
