From d3f445a199c0ed050bd4fa4bc00f331111a7a64d Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 5 Feb 2015 12:34:29 +0000 Subject: QPID-6363: [Java Client] Register SASL mechanisms early git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1657555 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/AMQConnection.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 70d91ad817..df55080f67 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -64,6 +64,7 @@ import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; @@ -192,6 +193,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private boolean _compressMessages; private int _messageCompressionThresholdSize; + static + { + // The registering of any additional SASL mechanisms with the Java Security API requires + // SecurityManager permissions. In execution environments such as web containers, + // this may require adjustments to the Java security.policy. + CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Loaded mechanisms " + registry.getMechanisms()); + } + } /** * @param broker brokerdetails * @param username username -- cgit v1.2.1 From bd9ca7255cd5c08ec348dd5976249e0a8dd1a8bc Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Sat, 7 Feb 2015 19:09:18 +0000 Subject: QPID-6371: [Java Broker/Java Common] Prevent log4j warning during Broker startup git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658098 13f79535-47bb-0310-9956-ffa450edef68 --- .../client/src/main/java/org/apache/qpid/client/AMQConnection.java | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index df55080f67..6c37462011 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -66,6 +66,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -195,6 +196,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect static { + if (_logger.isDebugEnabled()) + { + _logger.debug("Qpid version : " + QpidProperties.getVersionString()); + } + // The registering of any additional SASL mechanisms with the Java Security API requires // SecurityManager permissions. In execution environments such as web containers, // this may require adjustments to the Java security.policy. -- cgit v1.2.1 From da8935e0491a2b8e9edb6671e5874b77516ee2a9 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 09:32:24 +0000 Subject: QPID-6374 : Reorder ock aquisition and remove synchronization where it is not desired to reduce deadlocks git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658652 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 109 ++++++++++---------- .../java/org/apache/qpid/client/AMQSession.java | 111 ++++++++++----------- .../apache/qpid/client/BasicMessageConsumer.java | 41 ++++---- 3 files changed, 133 insertions(+), 128 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c37462011..c7fcde824a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -865,13 +865,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(long timeout) throws JMSException { - close(new ArrayList(_sessions.values()), timeout); - } + boolean closed; - public void close(List sessions, long timeout) throws JMSException - { - if (!setClosed()) + synchronized (_sessionCreationLock) + { + closed = setClosed(); + } + + if (!closed) { + List sessions = new ArrayList<>(_sessions.values()); + setClosing(true); try { @@ -886,54 +890,52 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void doClose(List sessions, long timeout) throws JMSException { - synchronized (_sessionCreationLock) + if (!sessions.isEmpty()) { - if (!sessions.isEmpty()) + AMQSession session = sessions.remove(0); + synchronized (session.getMessageDeliveryLock()) { - AMQSession session = sessions.remove(0); - synchronized (session.getMessageDeliveryLock()) - { - doClose(sessions, timeout); - } + doClose(sessions, timeout); } - else + } + else + { + synchronized (getFailoverMutex()) { - synchronized (getFailoverMutex()) + try { try { - try - { - closeAllSessions(null, timeout); - } - finally - { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); - } + closeAllSessions(null, timeout); } - catch (JMSException e) + finally { - _logger.error("Error closing connection", e); - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } - finally + } + catch (JMSException e) + { + _logger.error("Error closing connection", e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + finally + { + try { - try - { - _delegate.closeConnection(timeout); - } - catch (Exception e) - { - _logger.warn("Error closing underlying protocol connection", e); - } + _delegate.closeConnection(timeout); + } + catch (Exception e) + { + _logger.warn("Error closing underlying protocol connection", e); } } } } + } private void shutdownTaskPool(final long timeout) @@ -1308,28 +1310,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + // decide if we are going to close the session + if (hardError(cause)) { - // decide if we are going to close the session - if (hardError(cause)) + closer = (!setClosed()) || closer; { - closer = (!setClosed()) || closer; - { - _logger.info("Closing AMQConnection due to :" + cause); - } - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause); + _logger.info("Closing AMQConnection due to :" + cause); } + } + else + { + _logger.info("Not a hard-error connection not closing: " + cause); + } + - // if we are closing the connection, close sessions first - if (closer) + // if we are closing the connection, close sessions first + if (closer) + { + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { try { - closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); } catch (JMSException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 29460bb42d..12e9285af8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -808,7 +808,16 @@ public abstract class AMQSession extends Closeable implements Messa if (sendClose) { + // The Synchronized block only needs to protect network traffic. - synchronized (_connection.getFailoverMutex()) + + try { - try + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) { - // If the session is open or we are in the process - // of closing the session then send a cance - // no point otherwise as the connection will be gone - if (!_session.isClosed() || _session.isClosing()) + synchronized(_session.getMessageDeliveryLock()) { - synchronized(_session.getMessageDeliveryLock()) + synchronized (_connection.getFailoverMutex()) { sendCancel(); } } } - catch (AMQException e) - { - throw new JMSAMQException("Error closing consumer: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("FailoverException interrupted basic cancel.", e); - } - catch (TransportException e) - { - throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); - } } + catch (AMQException e) + { + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); + } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } + } else { -- cgit v1.2.1 From 6bca3754c2b893ae0a27d3c11559f25c9b1e7ea4 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 11:58:57 +0000 Subject: QPID-6374 : Always call exception listener from connection task pool, rather than from within the failover mutex git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658689 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 15 ++++++++--- .../qpid/client/AMQConnectionDelegate_0_10.java | 30 ++++++++++++++-------- .../org/apache/qpid/client/MockAMQConnection.java | 10 ++++++-- 3 files changed, 39 insertions(+), 16 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index c7fcde824a..5518435b94 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1349,16 +1349,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) { - // deliver the exception if there is a listener - ExceptionListener exceptionListener = getExceptionListenerNoCheck(); + final ExceptionListener exceptionListener = getExceptionListenerNoCheck(); if (exceptionListener != null) { - exceptionListener.onException(je); + performConnectionTask(new Runnable() + { + @Override + public void run() + { + // deliver the exception if there is a listener + exceptionListener.onException(je); + } + }); } else { _logger.error("Throwable Received but no listener set: " + cause); } + + } private boolean hardError(Throwable cause) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4e9164c3b0..fdeab7ae70 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -291,7 +291,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void closed(Connection conn) { - ConnectionException exc = exception; + final ConnectionException exc = exception; exception = null; if (exc == null) @@ -299,7 +299,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return; } - ConnectionClose close = exc.getClose(); + final ConnectionClose close = exc.getClose(); if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED) { _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); @@ -332,23 +332,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.setClosed(); - ExceptionListener listener = _conn.getExceptionListenerNoCheck(); + final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { _logger.error("connection exception: " + conn, exc); } else { - String code = null; - if (close != null) + _conn.performConnectionTask(new Runnable() { - code = close.getReplyCode().toString(); - } + @Override + public void run() + { + String code = null; + if (close != null) + { + code = close.getReplyCode().toString(); + } + + JMSException ex = new JMSException(exc.getMessage(), code); + ex.setLinkedException(exc); + ex.initCause(exc); + listener.onException(ex); + } + }); - JMSException ex = new JMSException(exc.getMessage(), code); - ex.setLinkedException(exc); - ex.initCause(exc); - listener.onException(ex); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index 009598d8a4..ceb2a323ca 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client; +import java.io.IOException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.URLSyntaxException; -import java.io.IOException; - public class MockAMQConnection extends AMQConnection { public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost) @@ -60,4 +60,10 @@ public class MockAMQConnection extends AMQConnection { return super.getDelegate(); } + + @Override + public void performConnectionTask(final Runnable task) + { + task.run(); + } } -- cgit v1.2.1 From e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 13:37:00 +0000 Subject: QPID-6374 : tidyup calls to connection task pool git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658714 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 40 ++++++++---- .../org/apache/qpid/client/AMQSession_0_8.java | 71 ++++++++++++---------- 2 files changed, 67 insertions(+), 44 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5518435b94..4c596b88a0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1353,16 +1354,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (exceptionListener != null) { performConnectionTask(new Runnable() - { - @Override - public void run() - { - // deliver the exception if there is a listener - exceptionListener.onException(je); - } - }); - } - else + { + @Override + public void run() + { + // deliver the exception if there is a listener + try + { + exceptionListener.onException(je); + } + catch (RuntimeException e) + { + _logger.error("Exception occurred in ExceptionListener", e); + } + } + }); + } + else { _logger.error("Throwable Received but no listener set: " + cause); } @@ -1478,7 +1486,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void performConnectionTask(Runnable task) { - _taskPool.execute(task); + try + { + _taskPool.execute(task); + } + catch (RejectedExecutionException e) + { + if(!(isClosed() || isClosing())) + { + throw e; + } + } } public AMQSession getSession(int channelId) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index bb0f0d9b13..143de271a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQSession