diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-09-13 13:55:42 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-09-13 13:55:42 +0000 |
| commit | 7666ee8f50ba9554e71142290fcc58c2e1ad46e9 (patch) | |
| tree | 2200dc043732fd131a5d12503db161f9aa30095f /qpid/java/client/src/main | |
| parent | 4bae7e33e13d88004a45e9baaff87fb249b0aead (diff) | |
| download | qpid-python-7666ee8f50ba9554e71142290fcc58c2e1ad46e9.tar.gz | |
QPID-3448: catch exceptions from the underlying Transport/Session/Connection and rethrow as a JMSException like users are expecting
Applied patch by Oleksandr Rudyy <orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1170182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
7 files changed, 188 insertions, 48 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25562cfff7..e0da1ef41f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -97,7 +97,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -606,8 +609,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. * * @throws IllegalStateException If the session is closed. + * @throws JMSException if there is a problem during acknowledge process. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws IllegalStateException, JMSException { if (isClosed()) { @@ -625,7 +629,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { break; } - acknowledgeMessage(tag, false); + + try + { + acknowledgeMessage(tag, false); + } + catch (TransportException e) + { + throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e); + } } } @@ -763,6 +775,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug( "Got FailoverException during channel close, ignored as channel already marked as closed."); } + catch (TransportException e) + { + throw toJMSException("Error closing session:" + e.getMessage(), e); + } finally { _connection.deregisterSession(_channelId); @@ -874,6 +890,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } + catch(TransportException e) + { + throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + } } public abstract void sendCommit() throws AMQException, FailoverException; @@ -1071,6 +1091,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ex.setLinkedException(e); throw ex; } + catch(TransportException e) + { + throw toJMSException("Error when verifying destination", e); + } } String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; @@ -1156,6 +1180,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return subscriber; } + catch (TransportException e) + { + throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e); + } finally { _subscriberDetails.unlock(); @@ -1405,7 +1433,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); - // return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } @@ -1442,7 +1469,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); Topic dest = checkValidTopic(topic); - // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); } @@ -1727,13 +1753,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Ensure that the session is not transacted. checkNotTransacted(); - // flush any acks we are holding in the buffer. - flushAcknowledgments(); - - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; + try { + // flush any acks we are holding in the buffer. + flushAcknowledgments(); + + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; boolean isSuspended = isSuspended(); @@ -1769,7 +1796,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); } - + catch(TransportException e) + { + throw toJMSException("Recover failed: " + e.getMessage(), e); + } } protected abstract void sendRecover() throws AMQException, FailoverException; @@ -1854,6 +1884,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); } + catch (TransportException e) + { + throw toJMSException("Failure to rollback:" + e.getMessage(), e); + } } } @@ -1900,7 +1934,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void unsubscribe(String name) throws JMSException { - unsubscribe(name, false); + try + { + unsubscribe(name, false); + } + catch (TransportException e) + { + throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e); + } } /** @@ -2021,8 +2062,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); - C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, - noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + C consumer; + try + { + consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + noLocal, exclusive, messageSelector, ft, noConsume, autoClose); + } + catch(TransportException e) + { + throw toJMSException("Exception while creating consumer: " + e.getMessage(), e); + } if (_messageListener != null) { @@ -2059,7 +2108,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ex.initCause(e); throw ex; } - + catch (TransportException e) + { + throw toJMSException("Exception while registering consumer:" + e.getMessage(), e); + } return consumer; } }, _connection).execute(); @@ -2601,8 +2653,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); long producerId = getNextProducerId(); - P producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + + P producer; + try + { + producer = createMessageProducer(destination, mandatory, + immediate, waitUntilSent, producerId); + } + catch (TransportException e) + { + throw toJMSException("Exception while creating producer:" + e.getMessage(), e); + } + registerProducer(producerId, producer); return producer; @@ -3009,6 +3071,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e); } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } } } @@ -3486,4 +3552,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return DECLARE_EXCHANGES; } + + JMSException toJMSException(String message, TransportException e) + { + int code = getErrorCode(e); + JMSException jmse = new JMSException(message, Integer.toString(code)); + jmse.setLinkedException(e); + jmse.initCause(e); + return jmse; + } + + private int getErrorCode(TransportException e) + { + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (e instanceof SessionException) + { + SessionException se = (SessionException) e; + if(se.getException() != null && se.getException().getErrorCode() != null) + { + code = se.getException().getErrorCode().getValue(); + } + } + return code; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index b5868cd235..bfbb9f7148 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -72,6 +72,7 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -548,7 +549,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) - throws JMSException { boolean res; ExchangeBoundResult bindingQueryResult = @@ -692,6 +692,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw ex; } + catch(TransportException e) + { + throw toJMSException("Exception while creating message producer:" + e.getMessage(), e); + } } @@ -994,7 +998,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - @Override public void commit() throws JMSException + @Override + public void commit() throws JMSException { checkTransacted(); try @@ -1007,12 +1012,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } sendCommit(); } - catch(SessionException e) + catch(TransportException e) { - JMSException ex = new JMSException("Session exception occured while trying to commit"); - ex.initCause(e); - ex.setLinkedException(e); - throw ex; + throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); } catch (AMQException e) { @@ -1383,5 +1385,5 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sb.append(">"); return sb.toString(); } - + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 5d32863f2f..754055ad98 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -419,6 +420,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return null; } + catch(TransportException e) + { + throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e); + } finally { releaseReceiving(); @@ -489,6 +494,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return null; } + catch(TransportException e) + { + throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e); + } finally { releaseReceiving(); @@ -582,6 +591,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } } } else @@ -775,7 +788,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } - void postDeliver(AbstractJMSMessage msg) throws JMSException + void postDeliver(AbstractJMSMessage msg) { switch (_acknowledgeMode) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 964c238946..47da59724c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -23,9 +23,7 @@ import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; @@ -365,21 +363,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (messageListener != null && capacity == 0) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); - } - if (messageListener != null && !_synchronousQueue.isEmpty()) + try { - Iterator messages=_synchronousQueue.iterator(); - while (messages.hasNext()) + if (messageListener != null && capacity == 0) { - AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); - messages.remove(); - _session.rejectMessage(message, true); + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } + if (messageListener != null && !_synchronousQueue.isEmpty()) + { + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) + { + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); + } + } + } + catch(TransportException e) + { + throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e); } } @@ -443,7 +448,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return o; } - void postDeliver(AbstractJMSMessage msg) throws JMSException + void postDeliver(AbstractJMSMessage msg) { super.postDeliver(msg); if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 8756ac4d05..2bfca025b2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; @@ -266,7 +267,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _destination; } - public void close() + public void close() throws JMSException { _closed.set(true); _session.deregisterProducer(_producerId); @@ -498,7 +499,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac message.setJMSMessageID(messageId); } - sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + try + { + sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + } + catch (TransportException e) + { + throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e); + } if (message != origMessage) { @@ -596,6 +604,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey()); + try + { + return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey()); + } + catch (TransportException e) + { + throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index d739903ee6..1fa5c1003f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -37,7 +37,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -47,6 +46,7 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,14 +246,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } } - + @Override public boolean isBound(AMQDestination destination) throws JMSException { return _session.isQueueBound(destination); } @Override - public void close() + public void close() throws JMSException { super.close(); AMQDestination dest = _destination; @@ -262,10 +262,18 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.SENDER ) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + try + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( _destination.getQueueName()); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } } } } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 1c2c46cf51..43b3b85641 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -47,7 +47,6 @@ import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; -import org.apache.qpid.messaging.Address; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.ExchangeQueryResult; import org.apache.qpid.transport.Future; @@ -56,6 +55,7 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -342,6 +342,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate e.setLinkedException(ex); throw e; } + catch (TransportException e) + { + JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage()); + jmse.initCause(e); + jmse.setLinkedException(e); + throw jmse; + } + } final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); |
