diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-05-11 13:07:11 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-05-11 13:07:11 +0000 |
| commit | eca5315ef57dd0a3abba05f549eb277db6357ae7 (patch) | |
| tree | 69d4e3c931cc236042bfd3f206824bc5e495d7b8 /java/amqp-1-0-client-jms | |
| parent | 8f5813d5c4bc68c29cd3ed86540041d1463b30f7 (diff) | |
| download | qpid-python-eca5315ef57dd0a3abba05f549eb277db6357ae7.tar.gz | |
QPID-4830 : [JMS AMQP 1.0] Improve error handling in the JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/amqp-1-0-client-jms')
4 files changed, 195 insertions, 3 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java new file mode 100644 index 0000000000..bc2b6349c8 --- /dev/null +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.amqp_1_0.jms; + +import javax.jms.JMSException; + +public class MessageRejectedException extends JMSException +{ + public MessageRejectedException(String s) + { + super(s); + } +} diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index cb42c49bbc..b6e11ab44e 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -220,12 +220,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ private MessageImpl receiveImpl(long timeout) throws JMSException
{
+
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
+ checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
@@ -242,8 +244,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return createJMSMessage(msg, redelivery);
}
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
Message receive0(final long timeout)
{
+
Message message = _receiver.receive(timeout);
if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 77544e4112..3b44bae84b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -19,17 +19,21 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
@@ -43,6 +47,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP private SessionImpl _session;
private Sender _sender;
private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -251,7 +257,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
- _sender.send(clientMessage, _session.getTxn());
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
if(getDestination() != null)
{
@@ -377,4 +404,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP {
send(topic, message, deliveryMode, priority, ttl);
}
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ccd5f01909..0b175f3b27 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class SessionImpl implements Session, QueueSession, TopicSession
{
@@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession jmsException.setLinkedException(e);
throw jmsException;
}
+ _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
+ {
+ @Override
+ public void remoteEnd(End end)
+ {
+ if(!_closed)
+ {
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ }
+ try
+ {
+ final Error error = end.getError();
+ final ExceptionListener exceptionListener = _connection.getExceptionListener();
+ if(exceptionListener != null)
+ {
+ if(error != null)
+ {
+ exceptionListener.onException(new JMSException(error.getDescription(),
+ error.getCondition().toString()));
+ }
+ else
+ {
+ exceptionListener.onException(new JMSException("Session remotely closed"));
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+ }
+ });
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
@@ -846,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession }
}
+ Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
+ while(consumers.hasNext())
+ {
+ MessageConsumerImpl consumer = consumers.next();
+ try
+ {
+ consumer.checkReceiverError();
+ }
+ catch (JMSException e)
+ {
+ consumers.remove();
+ try
+ {
+ _connection.getExceptionListener().onException(e);
+ consumer.close();
+ }
+ catch (JMSException e1)
+ {
+ }
+ }
+ }
}
}
|
