diff options
Diffstat (limited to 'java')
10 files changed, 404 insertions, 41 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f15af72407..941534c7ff 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -175,6 +175,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; + //used to track the last failover time for + //Address resolution purposes + private volatile long _lastFailoverTime = 0; + /** * @param broker brokerdetails * @param username username @@ -1076,6 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public boolean firePreFailover(boolean redirect) { + _lastFailoverTime = System.currentTimeMillis(); boolean proceed = true; if (_connectionListener != null) { @@ -1462,4 +1467,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } + + public long getLastFailoverTime() + { + return _lastFailoverTime; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 0ed3db6ecb..0d48dd5822 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -281,24 +281,29 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); - try + _qpidConnection.notifyFailoverRequired(); + + synchronized (_conn.getFailoverMutex()) { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + try { - _conn.failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - return; + if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + { + _conn.failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + return; + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); } } @@ -324,6 +329,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E { + if (_conn.isFailingOver()) + { + try + { + _conn.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + //ignore + } + } + try { return operation.execute(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index acd46da11a..f9a38138ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.net.URISyntaxException; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Destination; import javax.naming.NamingException; @@ -59,7 +60,7 @@ public abstract class AMQDestination implements Destination, Referenceable private boolean _browseOnly; - private boolean _isAddressResolved; + private AtomicLong _addressResolved = new AtomicLong(0); private AMQShortString _queueName; @@ -77,7 +78,7 @@ public abstract class AMQDestination implements Destination, Referenceable public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; - + // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -740,12 +741,12 @@ public abstract class AMQDestination implements Destination, Referenceable public boolean isAddressResolved() { - return _isAddressResolved; + return _addressResolved.get() > 0; } - public void setAddressResolved(boolean addressResolved) + public void setAddressResolved(long addressResolved) { - _isAddressResolved = addressResolved; + _addressResolved.set(addressResolved); } private static Address createAddressFromString(String str) @@ -823,7 +824,7 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setTargetNode(_targetNode); dest.setSourceNode(_sourceNode); dest.setLink(_link); - dest.setAddressResolved(_isAddressResolved); + dest.setAddressResolved(_addressResolved.get()); return dest; } @@ -836,4 +837,9 @@ public abstract class AMQDestination implements Destination, Referenceable { _isDurable = b; } + + public boolean isResolvedAfter(long time) + { + return _addressResolved.get() > time; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index c6a64ec894..2869e96a87 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1179,8 +1179,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean isConsumer, boolean noWait) throws AMQException { - if (dest.isAddressResolved()) - { + if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime())) + { if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) { createSubscriptionQueue(dest); @@ -1269,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue"); } - dest.setAddressResolved(true); + dest.setAddressResolved(System.currentTimeMillis()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 57f64c2f92..16afa51c74 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } catch (Exception e) { - JMSException jmse = new JMSException("Exception when sending message"); + JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage()); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 1c521244d0..06c5c83031 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (Session ssn : channels.values()) - { - ssn.closeCode(close); - } ConnectionCloseCode code = close.getReplyCode(); if (code != ConnectionCloseCode.NORMAL) { @@ -705,4 +701,13 @@ public class Connection extends ConnectionInvoker { return sessions.containsKey(new Binary(name.getBytes())); } + + public void notifyFailoverRequired() + { + List<Session> values = new ArrayList<Session>(channels.values()); + for (Session ssn : values) + { + ssn.notifyFailoverRequired(); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index b732191707..321e5256b2 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Session @@ -125,6 +126,8 @@ public class Session extends SessionInvoker private SessionDetachCode detachCode; private final Object stateLock = new Object(); + private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); @@ -257,6 +260,7 @@ public class Session extends SessionInvoker void resume() { + _failoverRequired.set(false); synchronized (commands) { attach(); @@ -459,7 +463,7 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED || state == CLOSING) + if (state == DETACHED || state == CLOSING || state == CLOSED) { return; } @@ -595,11 +599,12 @@ public class Session extends SessionInvoker if (state != OPEN && state != CLOSED && state != CLOSING) { Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (!current.equals(resumer) ) { Waiter w = new Waiter(commands, timeout); while (w.hasTime() && (state != OPEN && state != CLOSED)) { + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -668,6 +673,7 @@ public class Session extends SessionInvoker } } } + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -762,6 +768,14 @@ public class Session extends SessionInvoker } } + private void checkFailoverRequired(String message) + { + if (_failoverRequired.get()) + { + throw new SessionException(message); + } + } + protected boolean shouldIssueFlush(int next) { return (next % 65536) == 0; @@ -787,6 +801,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { + checkFailoverRequired("Session sync was interrupted by failover."); log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); w.await(); } @@ -847,13 +862,6 @@ public class Session extends SessionInvoker } } - private ConnectionClose close = null; - - void closeCode(ConnectionClose close) - { - this.close = close; - } - ExecutionException getException() { synchronized (results) @@ -904,6 +912,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(this, timeout); while (w.hasTime() && state != CLOSED && !isDone()) { + checkFailoverRequired("Operation was interrupted by failover."); log.debug("%s waiting for result: %s", Session.this, this); w.await(); } @@ -915,7 +924,12 @@ public class Session extends SessionInvoker } else if (state == CLOSED) { - throw new SessionException(getException()); + ExecutionException ex = getException(); + if(ex == null) + { + throw new SessionClosedException(); + } + throw new SessionException(ex); } else { @@ -995,6 +1009,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED) { + checkFailoverRequired("close() was interrupted by failover."); w.await(); } @@ -1089,6 +1104,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(stateLock, timeout); while (w.hasTime() && state == NEW) { + checkFailoverRequired("Session opening was interrupted by failover."); w.await(); } } @@ -1111,4 +1127,26 @@ public class Session extends SessionInvoker { return stateLock; } + + protected void notifyFailoverRequired() + { + //ensure any operations waiting are aborted to + //prevent them waiting for timeout for 60 seconds + //and possibly preventing failover proceeding + _failoverRequired.set(true); + synchronized (commands) + { + commands.notifyAll(); + } + synchronized (results) + { + for (ResultFuture<?> result : results.values()) + { + synchronized(result) + { + result.notifyAll(); + } + } + } + } } diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java new file mode 100644 index 0000000000..980fc7285d --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java @@ -0,0 +1,14 @@ +package org.apache.qpid.client.failover; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest +{ + @Override + protected Destination createDestination(Session session) throws JMSException + { + return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}"); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 66f8fe0546..a5b9c618bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -20,6 +20,7 @@ package org.apache.qpid.client.failover; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,12 +34,15 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.test.utils.FailoverBaseCase; /** @@ -96,6 +100,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio */ private JMSException _exceptionListenerException; + /** + * Latch to check that failover mutex is hold by a failover thread + */ + private CountDownLatch _failoverStarted; + @Override protected void setUp() throws Exception { @@ -105,6 +114,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio _connection.setExceptionListener(this); ((AMQConnection) _connection).setConnectionListener(this); _failoverComplete = new CountDownLatch(1); + _failoverStarted = new CountDownLatch(1); } /** @@ -625,8 +635,134 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE); } + public void testPublishAutoAcknowledgedWhileFailover() throws Exception + { + publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE); + } + + public void testPublishClientAcknowledgedWhileFailover() throws Exception + { + Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE); + receivedMessage.acknowledge(); + } + + public void testPublishTransactedAcknowledgedWhileFailover() throws Exception + { + publishWhileFailingOver(Session.SESSION_TRANSACTED); + _consumerSession.commit(); + } + + public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE); + } + + public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE); + + } + + public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.SESSION_TRANSACTED); + } + + public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException + { + setDelayedFailoverPolicy(5); + init(autoAcknowledge, true); + + String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); + Message message = _producerSession.createTextMessage(text); + + failBroker(getFailingPort()); + + if(!_failoverStarted.await(5, TimeUnit.SECONDS)) + { + fail("Did not receieve notification failover had started"); + } + + _producer.send(message); + + if (_producerSession.getTransacted()) + { + _producerSession.commit(); + } + + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + return receivedMessage; + } + + private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException + { + setDelayedFailoverPolicy(5); + init(autoAcknowledge, true); + + String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); + Message message = _producerSession.createTextMessage(text); + + AMQConnection connection = (AMQConnection)_connection; + + // holding failover mutex should prevent the failover from + // proceeding before we try to send the message + synchronized(connection.getFailoverMutex()) + { + failBroker(getFailingPort()); + + // wait to make sure that connection is lost + while(!connection.isFailingOver()) + { + Thread.sleep(25l); + } + + try + { + _producer.send(message); + fail("Sending should fail because connection was lost and failover has not yet completed"); + } + catch(JMSException e) + { + // JMSException is expected + } + } + // wait for failover completion, thus ensuring it actually + //got started, before allowing the test to tear down + awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); + } /** - * Tests {@link Session#close()} for session with given acknowledge mode + * Tests {@link Session#close()} for session with given acknowledge mode * to ensure that close works after failover. * * @param acknowledgeMode session acknowledge mode @@ -671,7 +807,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); - _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + _destination = createDestination(_consumerSession); _consumer = _consumerSession.createConsumer(_destination); if (startConnection) @@ -684,6 +820,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio } + protected Destination createDestination(Session session) throws JMSException + { + return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + } + /** * Resends messages if reconnected to a non-clustered broker * @@ -879,6 +1020,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio @Override public boolean preFailover(boolean redirect) { + _failoverStarted.countDown(); return true; } @@ -900,6 +1042,39 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio _exceptionListenerException = e; } + /** + * Causes 1 second delay before reconnect in order to test whether JMS + * methods block while failover is in progress + */ + private static class DelayingFailoverPolicy extends FailoverPolicy + { + + private CountDownLatch _suspendLatch; + private long _delay; + + public DelayingFailoverPolicy(AMQConnection connection, long delay) + { + super(connection.getConnectionURL(), connection); + _suspendLatch = new CountDownLatch(1); + _delay = delay; + } + + public void attainedConnection() + { + try + { + _suspendLatch.await(_delay, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + // continue + } + super.attainedConnection(); + } + + } + + private class FailoverTestMessageListener implements MessageListener { // message counter @@ -946,4 +1121,101 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio return _counter.get(); } } + + /** + * Tests {@link Session#close()} for session with given acknowledge mode + * to ensure that it blocks until failover implementation restores connection. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + { + initDelayedFailover(acknowledgeMode); + + // intentionally receive message but not commit or acknowledge it in + // case of transacted or CLIENT_ACK session + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + failBroker(getFailingPort()); + + // test whether session#close blocks while failover is in progress + _consumerSession.close(); + + assertFailoverException(); + } + + /** + * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing. + * + * @param acknowledgeMode session acknowledge mode + * @return queue browser + * @throws JMSException + */ + private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException + { + init(acknowledgeMode, false); + _consumer.close(); + QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination); + _connection.start(); + + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + return browser; + } + + /** + * Tests {@link QueueBrowser#close()} for session with given acknowledge mode + * to ensure that it blocks until failover implementation restores connection. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + { + setDelayedFailoverPolicy(); + + QueueBrowser browser = prepareQueueBrowser(acknowledgeMode); + + @SuppressWarnings("unchecked") + Enumeration<Message> messages = browser.getEnumeration(); + Message receivedMessage = (Message) messages.nextElement(); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + failBroker(getFailingPort()); + + browser.close(); + + assertFailoverException(); + } + + private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException + { + DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy(); + init(acknowledgeMode, true); + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + return failoverPolicy; + } + + private DelayingFailoverPolicy setDelayedFailoverPolicy() + { + return setDelayedFailoverPolicy(2); + } + + private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay) + { + AMQConnection amqConnection = (AMQConnection) _connection; + DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay); + ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy); + return failoverPolicy; + } + } diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index 8344cd98c2..68feaf1e2b 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -28,6 +28,7 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions +org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy |
