From 0b22baa11318fc7e86c9d1b9b74ad3d83e276859 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 11 May 2013 13:07:11 +0000 Subject: QPID-4830 : [JMS AMQP 1.0] Improve error handling in the JMS client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1481321 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/jms/MessageRejectedException.java | 29 ++++++++ .../amqp_1_0/jms/impl/MessageConsumerImpl.java | 17 ++++- .../amqp_1_0/jms/impl/MessageProducerImpl.java | 86 +++++++++++++++++++++- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 66 ++++++++++++++++- .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 37 ++++++++-- .../java/org/apache/qpid/amqp_1_0/client/Dump.java | 9 +-- .../apache/qpid/amqp_1_0/client/Filesender.java | 19 +---- .../org/apache/qpid/amqp_1_0/client/Request.java | 15 +--- .../org/apache/qpid/amqp_1_0/client/Respond.java | 12 +-- .../java/org/apache/qpid/amqp_1_0/client/Send.java | 11 +-- .../amqp_1_0/client/LinkDetachedException.java | 40 ++++++++++ .../org/apache/qpid/amqp_1_0/client/Sender.java | 44 +++++++++-- 12 files changed, 312 insertions(+), 73 deletions(-) create mode 100644 qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java create mode 100644 qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java new file mode 100644 index 0000000000..bc2b6349c8 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.amqp_1_0.jms; + +import javax.jms.JMSException; + +public class MessageRejectedException extends JMSException +{ + public MessageRejectedException(String s) + { + super(s); + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index cb42c49bbc..b6e11ab44e 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -220,12 +220,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return receiveImpl(0L); } - private MessageImpl receiveImpl(long timeout) throws IllegalStateException + private MessageImpl receiveImpl(long timeout) throws JMSException { + org.apache.qpid.amqp_1_0.client.Message msg; boolean redelivery; if(_replaymessages.isEmpty()) { + checkReceiverError(); msg = receive0(timeout); redelivery = false; } @@ -242,8 +244,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi return createJMSMessage(msg, redelivery); } + void checkReceiverError() throws JMSException + { + final Error receiverError = _receiver.getError(); + if(receiverError != null) + { + JMSException jmsException = + new JMSException(receiverError.getDescription(), receiverError.getCondition().toString()); + + throw jmsException; + } + } + Message receive0(final long timeout) { + Message message = _receiver.receive(timeout); if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE) { diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 77544e4112..3b44bae84b 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -19,17 +19,21 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; +import org.apache.qpid.amqp_1_0.client.LinkDetachedException; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.jms.MessageProducer; +import org.apache.qpid.amqp_1_0.jms.MessageRejectedException; import org.apache.qpid.amqp_1_0.jms.QueueSender; import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher; import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import javax.jms.*; import javax.jms.IllegalStateException; import java.util.UUID; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher { @@ -43,6 +47,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP private SessionImpl _session; private Sender _sender; private boolean _closed; + private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish"); + private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l); protected MessageProducerImpl(final Destination destination, final SessionImpl session) throws JMSException @@ -251,7 +257,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections()); - _sender.send(clientMessage, _session.getTxn()); + DispositionAction action = null; + + if(_syncPublish) + { + action = new DispositionAction(_sender); + } + + try + { + _sender.send(clientMessage, _session.getTxn(), action); + } + catch (LinkDetachedException e) + { + JMSException jmsException = new InvalidDestinationException("Sender has been closed"); + jmsException.setLinkedException(e); + throw jmsException; + } + + if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis())) + { + throw new MessageRejectedException("Message was rejected"); + } if(getDestination() != null) { @@ -377,4 +404,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP { send(topic, message, deliveryMode, priority, ttl); } + + private static class DispositionAction implements Sender.OutcomeAction + { + private final Sender _sender; + private final Object _lock; + private Outcome _outcome; + + public DispositionAction(Sender sender) + { + _sender = sender; + _lock = sender.getEndpoint().getLock(); + } + + @Override + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + synchronized (_lock) + { + _outcome = outcome; + _lock.notifyAll(); + } + } + + public boolean wasAccepted(long timeout) throws JMSException + { + synchronized(_lock) + { + while(_outcome == null && !_sender.getEndpoint().isDetached()) + { + try + { + _lock.wait(timeout - System.currentTimeMillis()); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + if(_outcome == null) + { + + if(_sender.getEndpoint().isDetached()) + { + throw new JMSException("Link was detached"); + } + else + { + throw new JMSException("Timed out waiting for message acceptance"); + } + } + else + { + return _outcome instanceof Accepted; + } + } + } + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ccd5f01909..0b175f3b27 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl; import java.io.Serializable; import java.util.ArrayList; import java.util.Enumeration; +import java.util.Iterator; import java.util.List; import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; @@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher; import org.apache.qpid.amqp_1_0.jms.TopicSession; import org.apache.qpid.amqp_1_0.jms.TopicSubscriber; +import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; public class SessionImpl implements Session, QueueSession, TopicSession { @@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession jmsException.setLinkedException(e); throw jmsException; } + _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener() + { + @Override + public void remoteEnd(End end) + { + if(!_closed) + { + try + { + close(); + } + catch (JMSException e) + { + } + try + { + final Error error = end.getError(); + final ExceptionListener exceptionListener = _connection.getExceptionListener(); + if(exceptionListener != null) + { + if(error != null) + { + exceptionListener.onException(new JMSException(error.getDescription(), + error.getCondition().toString())); + } + else + { + exceptionListener.onException(new JMSException("Session remotely closed")); + } + } + } + catch (JMSException e) + { + + } + + } + } + }); if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED) { _txn = _session.createSessionLocalTransaction(); @@ -846,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession } } + Iterator consumers = _consumers.iterator(); + while(consumers.hasNext()) + { + MessageConsumerImpl consumer = consumers.next(); + try + { + consumer.checkReceiverError(); + } + catch (JMSException e) + { + consumers.remove(); + try + { + _connection.getExceptionListener().onException(e); + consumer.close(); + } + catch (JMSException e1) + { + } + } + } } } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java index c3193f9fea..5e77b7097c 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -173,7 +173,14 @@ public class Demo extends Util Section[] sections = { properties, appProperties, amqpValue}; final Message message1 = new Message(Arrays.asList(sections)); - s.send(message1); + try + { + s.send(message1); + } + catch (Exception e) + { + throw new RuntimeException(e); + } Map sendingLinks = new HashMap(); Map receivingLinks = new HashMap(); @@ -295,7 +302,14 @@ public class Demo extends Util m2propmap.put(VENDOR, vendor); ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); - sender.send(m2); + try + { + sender.send(m2); + } + catch (Exception e) + { + throw new RuntimeException(e); + } Map m3propmap = new HashMap(); m3propmap.put(OPCODE, LOG); @@ -307,8 +321,14 @@ public class Demo extends Util Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), new AmqpValue("AMQP-"+messageId))); - s.send(m3); - + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } responseReceiver.acknowledge(m); @@ -336,7 +356,14 @@ public class Demo extends Util Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), new AmqpValue("AMQP-"+mp.getMessageId()))); - s.send(m3); + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } entry.getValue().acknowledge(m); } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java index 998d68cfa6..06440b8f19 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java @@ -121,14 +121,7 @@ public class Dump extends Util session.close(); conn.close(); - } catch (ConnectionException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) + } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java index 37550ea52f..c7bcd99312 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java @@ -248,27 +248,10 @@ public class Filesender extends Util session.close(); conn.close(); } - catch (ConnectionException e) + catch (Exception e) { e.printStackTrace(); } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); - } catch (FileNotFoundException e) - { - e.printStackTrace(); - } catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (NoSuchAlgorithmException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java index d8da58dc76..dbe273182f 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -216,23 +216,10 @@ public class Request extends Util conn2.close(); } } - catch (ConnectionException e) + catch (Exception e) { e.printStackTrace(); //TODO. } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - } protected boolean hasSingleLinkPerConnectionMode() diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java index 6b1b70476e..1e4bcfc7d7 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -274,21 +274,13 @@ public class Respond extends Util _conn.close(); System.out.println("Received: " + receivedCount); } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) + catch (Exception e) { e.printStackTrace(); //TODO. } } - private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException + private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException { List
sections = m.getPayload(); String replyTo = null; diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java index ef1a31005c..b4ae16ab3f 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -219,19 +219,10 @@ public class Send extends Util session.close(); conn.close(); } - catch (Sender.SenderClosingException e) + catch (Exception e) { e.printStackTrace(); //TODO. } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java new file mode 100644 index 0000000000..45b00255f2 --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.transport.Error; + +public class LinkDetachedException extends Exception +{ + private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError; + + public LinkDetachedException(Error remoteError) + { + super(); + _remoteError = remoteError; + } + + public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError() + { + return _remoteError; + } + +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index cf9f44af75..0600c18474 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.Source; import org.apache.qpid.amqp_1_0.type.Target; @@ -35,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.amqp_1_0.type.transport.Error; public class Sender implements DeliveryStateHandler { @@ -44,6 +47,7 @@ public class Sender implements DeliveryStateHandler private int _windowSize; private Map _outcomeActions = Collections.synchronizedMap(new HashMap()); private boolean _closed; + private Error _error; public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) throws SenderCreationException, ConnectionClosedException @@ -166,6 +170,17 @@ public class Sender implements DeliveryStateHandler throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); }; } + + _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() + { + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + super.remoteDetached(endpoint, detach); + } + }); } public Source getSource() @@ -178,22 +193,22 @@ public class Sender implements DeliveryStateHandler return _endpoint.getTarget(); } - public void send(Message message) + public void send(Message message) throws LinkDetachedException { send(message, null, null); } - public void send(Message message, final OutcomeAction action) + public void send(Message message, final OutcomeAction action) throws LinkDetachedException { send(message, null, action); } - public void send(Message message, final Transaction txn) + public void send(Message message, final Transaction txn) throws LinkDetachedException { send(message, txn, null); } - public void send(Message message, final Transaction txn, OutcomeAction action) + public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException { List
sections = message.getPayload(); @@ -245,7 +260,7 @@ public class Sender implements DeliveryStateHandler final Object lock = _endpoint.getLock(); synchronized(lock) { - while(!_endpoint.hasCreditToSend()) + while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) { try { @@ -256,6 +271,10 @@ public class Sender implements DeliveryStateHandler e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } + if(_endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } if(action != null) { _outcomeActions.put(message.getDeliveryTag(), action); @@ -352,6 +371,21 @@ public class Sender implements DeliveryStateHandler _endpoint.updateDisposition(deliveryTag, state, true); } } + else if(state instanceof TransactionalState) + { + OutcomeAction action; + + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome()); + } + + } + } + + public SendingLinkEndpoint getEndpoint() + { + return _endpoint; } public Map getRemoteUnsettled() -- cgit v1.2.1