summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-09-13 13:55:42 +0000
committerRobert Gemmell <robbie@apache.org>2011-09-13 13:55:42 +0000
commit7666ee8f50ba9554e71142290fcc58c2e1ad46e9 (patch)
tree2200dc043732fd131a5d12503db161f9aa30095f /qpid/java/client/src/main
parent4bae7e33e13d88004a45e9baaff87fb249b0aead (diff)
downloadqpid-python-7666ee8f50ba9554e71142290fcc58c2e1ad46e9.tar.gz
QPID-3448: catch exceptions from the underlying Transport/Session/Connection and rethrow as a JMSException like users are expecting
Applied patch by Oleksandr Rudyy <orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1170182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java121
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java35
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java21
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java10
7 files changed, 188 insertions, 48 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 25562cfff7..e0da1ef41f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -97,7 +97,10 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -606,8 +609,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
+ * @throws JMSException if there is a problem during acknowledge process.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws IllegalStateException, JMSException
{
if (isClosed())
{
@@ -625,7 +629,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
break;
}
- acknowledgeMessage(tag, false);
+
+ try
+ {
+ acknowledgeMessage(tag, false);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
+ }
}
}
@@ -763,6 +775,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug(
"Got FailoverException during channel close, ignored as channel already marked as closed.");
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Error closing session:" + e.getMessage(), e);
+ }
finally
{
_connection.deregisterSession(_channelId);
@@ -874,6 +890,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ }
}
public abstract void sendCommit() throws AMQException, FailoverException;
@@ -1071,6 +1091,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ex.setLinkedException(e);
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Error when verifying destination", e);
+ }
}
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1156,6 +1180,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return subscriber;
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
+ }
finally
{
_subscriberDetails.unlock();
@@ -1405,7 +1433,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
- // return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
@@ -1442,7 +1469,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Topic dest = checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
@@ -1727,13 +1753,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Ensure that the session is not transacted.
checkNotTransacted();
- // flush any acks we are holding in the buffer.
- flushAcknowledgments();
-
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
+
try
{
+ // flush any acks we are holding in the buffer.
+ flushAcknowledgments();
+
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
boolean isSuspended = isSuspended();
@@ -1769,7 +1796,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
-
+ catch(TransportException e)
+ {
+ throw toJMSException("Recover failed: " + e.getMessage(), e);
+ }
}
protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1854,6 +1884,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Failure to rollback:" + e.getMessage(), e);
+ }
}
}
@@ -1900,7 +1934,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void unsubscribe(String name) throws JMSException
{
- unsubscribe(name, false);
+ try
+ {
+ unsubscribe(name, false);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
+ }
}
/**
@@ -2021,8 +2062,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
- C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ C consumer;
+ try
+ {
+ consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ }
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
+ }
if (_messageListener != null)
{
@@ -2059,7 +2108,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ex.initCause(e);
throw ex;
}
-
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
+ }
return consumer;
}
}, _connection).execute();
@@ -2601,8 +2653,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
long producerId = getNextProducerId();
- P producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+
+ P producer;
+ try
+ {
+ producer = createMessageProducer(destination, mandatory,
+ immediate, waitUntilSent, producerId);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
+ }
+
registerProducer(producerId, producer);
return producer;
@@ -3009,6 +3071,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
}
+ catch (TransportException e)
+ {
+ throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+ }
}
}
@@ -3486,4 +3552,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return DECLARE_EXCHANGES;
}
+
+ JMSException toJMSException(String message, TransportException e)
+ {
+ int code = getErrorCode(e);
+ JMSException jmse = new JMSException(message, Integer.toString(code));
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ return jmse;
+ }
+
+ private int getErrorCode(TransportException e)
+ {
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (e instanceof SessionException)
+ {
+ SessionException se = (SessionException) e;
+ if(se.getException() != null && se.getException().getErrorCode() != null)
+ {
+ code = se.getException().getErrorCode().getValue();
+ }
+ }
+ return code;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index b5868cd235..bfbb9f7148 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -72,6 +72,7 @@ import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -548,7 +549,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
- throws JMSException
{
boolean res;
ExchangeBoundResult bindingQueryResult =
@@ -692,6 +692,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
+ }
}
@@ -994,7 +998,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- @Override public void commit() throws JMSException
+ @Override
+ public void commit() throws JMSException
{
checkTransacted();
try
@@ -1007,12 +1012,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
sendCommit();
}
- catch(SessionException e)
+ catch(TransportException e)
{
- JMSException ex = new JMSException("Session exception occured while trying to commit");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
+ throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
}
catch (AMQException e)
{
@@ -1383,5 +1385,5 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sb.append(">");
return sb.toString();
}
-
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 5d32863f2f..754055ad98 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -419,6 +420,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+ }
finally
{
releaseReceiving();
@@ -489,6 +494,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+ }
finally
{
releaseReceiving();
@@ -582,6 +591,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
+ catch (TransportException e)
+ {
+ throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
+ }
}
}
else
@@ -775,7 +788,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 964c238946..47da59724c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -23,9 +23,7 @@ import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
@@ -365,21 +363,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
- if (messageListener != null && !_synchronousQueue.isEmpty())
+ try
{
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
+ if (messageListener != null && capacity == 0)
{
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
+ if (messageListener != null && !_synchronousQueue.isEmpty())
+ {
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
+ }
+ }
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
}
}
@@ -443,7 +448,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return o;
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 8756ac4d05..2bfca025b2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -39,6 +39,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -266,7 +267,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _destination;
}
- public void close()
+ public void close() throws JMSException
{
_closed.set(true);
_session.deregisterProducer(_producerId);
@@ -498,7 +499,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
message.setJMSMessageID(messageId);
}
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ try
+ {
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ }
+ catch (TransportException e)
+ {
+ throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
+ }
if (message != origMessage)
{
@@ -596,6 +604,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+ try
+ {
+ return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+ }
+ catch (TransportException e)
+ {
+ throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index d739903ee6..1fa5c1003f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -37,7 +37,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -47,6 +46,7 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -246,14 +246,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
}
-
+ @Override
public boolean isBound(AMQDestination destination) throws JMSException
{
return _session.isQueueBound(destination);
}
@Override
- public void close()
+ public void close() throws JMSException
{
super.close();
AMQDestination dest = _destination;
@@ -262,10 +262,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.SENDER )
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ try
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
_destination.getQueueName());
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
}
}
}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 1c2c46cf51..43b3b85641 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -47,7 +47,6 @@ import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
-import org.apache.qpid.messaging.Address;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.Future;
@@ -56,6 +55,7 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -342,6 +342,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
e.setLinkedException(ex);
throw e;
}
+ catch (TransportException e)
+ {
+ JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
+ jmse.initCause(e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());