diff options
Diffstat (limited to 'qpid/java/client/src')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java | 80 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 2 |
2 files changed, 80 insertions, 2 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 66cade18a4..35582d92b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -21,16 +21,20 @@ package org.apache.qpid.client; import java.net.ConnectException; +import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.transport.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +71,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; + private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, + Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, + ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); private boolean _messageCompressionSupported; private boolean _addrSyntaxSupported; private boolean _confirmedPublishSupported; @@ -136,7 +143,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, monitoringReceiver, _conn.getProtocolHandler()); try @@ -171,6 +180,19 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate network.close(); throw e; } + finally + { + // await the receiver to finish its execution (and so the IO threads too) + if (!_conn.isConnected()) + { + boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout); + if (!closedWithinTimeout) + { + _logger.warn("Timed-out waiting for receiver for connection to " + + brokerDetail + " to be closed."); + } + } + } } @@ -503,4 +525,60 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _confirmedPublishNonTransactionalSupported; } + + + private static class ReceiverClosedWaiter implements Receiver<ByteBuffer> + { + private final CountDownLatch _closedWatcher; + private final Receiver<ByteBuffer> _receiver; + + public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver) + { + _receiver = receiver; + _closedWatcher = new CountDownLatch(1); + } + + @Override + public void received(ByteBuffer msg) + { + _receiver.received(msg); + } + + @Override + public void exception(Throwable t) + { + _receiver.exception(t); + } + + @Override + public void closed() + { + try + { + _receiver.closed(); + } + finally + { + _closedWatcher.countDown(); + } + } + + public boolean awaitClose(long timeout) + { + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Waiting " + timeout + "ms for receiver to be closed"); + } + + return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return _closedWatcher.getCount() == 0; + } + } + }; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c61469559a..c2582accdf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -240,7 +240,7 @@ public class AMQProtocolHandler implements ProtocolEngine } catch (Exception e) { - _logger.warn("Exception occured on closing the sender", e); + _logger.warn("Exception occurred on closing the sender", e); } if (_connection.failoverAllowed()) { |
