summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client-jms
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 13:07:11 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 13:07:11 +0000
commiteca5315ef57dd0a3abba05f549eb277db6357ae7 (patch)
tree69d4e3c931cc236042bfd3f206824bc5e495d7b8 /java/amqp-1-0-client-jms
parent8f5813d5c4bc68c29cd3ed86540041d1463b30f7 (diff)
downloadqpid-python-eca5315ef57dd0a3abba05f549eb277db6357ae7.tar.gz
QPID-4830 : [JMS AMQP 1.0] Improve error handling in the JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/amqp-1-0-client-jms')
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java29
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java17
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java86
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java66
4 files changed, 195 insertions, 3 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
new file mode 100644
index 0000000000..bc2b6349c8
--- /dev/null
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public class MessageRejectedException extends JMSException
+{
+ public MessageRejectedException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index cb42c49bbc..b6e11ab44e 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -220,12 +220,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ private MessageImpl receiveImpl(long timeout) throws JMSException
{
+
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
+ checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
@@ -242,8 +244,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
return createJMSMessage(msg, redelivery);
}
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
Message receive0(final long timeout)
{
+
Message message = _receiver.receive(timeout);
if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index 77544e4112..3b44bae84b 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -19,17 +19,21 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
@@ -43,6 +47,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
private SessionImpl _session;
private Sender _sender;
private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -251,7 +257,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
- _sender.send(clientMessage, _session.getTxn());
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
if(getDestination() != null)
{
@@ -377,4 +404,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
{
send(topic, message, deliveryMode, priority, ttl);
}
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index ccd5f01909..0b175f3b27 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class SessionImpl implements Session, QueueSession, TopicSession
{
@@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession
jmsException.setLinkedException(e);
throw jmsException;
}
+ _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
+ {
+ @Override
+ public void remoteEnd(End end)
+ {
+ if(!_closed)
+ {
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ }
+ try
+ {
+ final Error error = end.getError();
+ final ExceptionListener exceptionListener = _connection.getExceptionListener();
+ if(exceptionListener != null)
+ {
+ if(error != null)
+ {
+ exceptionListener.onException(new JMSException(error.getDescription(),
+ error.getCondition().toString()));
+ }
+ else
+ {
+ exceptionListener.onException(new JMSException("Session remotely closed"));
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+ }
+ });
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
@@ -846,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession
}
}
+ Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
+ while(consumers.hasNext())
+ {
+ MessageConsumerImpl consumer = consumers.next();
+ try
+ {
+ consumer.checkReceiverError();
+ }
+ catch (JMSException e)
+ {
+ consumers.remove();
+ try
+ {
+ _connection.getExceptionListener().onException(e);
+ consumer.close();
+ }
+ catch (JMSException e1)
+ {
+ }
+ }
+ }
}
}