summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java58
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java276
-rw-r--r--java/test-profiles/JavaPre010Excludes1
10 files changed, 404 insertions, 41 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f15af72407..941534c7ff 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -175,6 +175,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ //used to track the last failover time for
+ //Address resolution purposes
+ private volatile long _lastFailoverTime = 0;
+
/**
* @param broker brokerdetails
* @param username username
@@ -1076,6 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public boolean firePreFailover(boolean redirect)
{
+ _lastFailoverTime = System.currentTimeMillis();
boolean proceed = true;
if (_connectionListener != null)
{
@@ -1462,4 +1467,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
+ public long getLastFailoverTime()
+ {
+ return _lastFailoverTime;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0ed3db6ecb..0d48dd5822 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -281,24 +281,29 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
- try
+ _qpidConnection.notifyFailoverRequired();
+
+ synchronized (_conn.getFailoverMutex())
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
}
}
@@ -324,6 +329,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
+ if (_conn.isFailingOver())
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
try
{
return operation.execute();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index acd46da11a..f9a38138ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -77,7 +78,7 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -740,12 +741,12 @@ public abstract class AMQDestination implements Destination, Referenceable
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination implements Destination, Referenceable
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -836,4 +837,9 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index c6a64ec894..2869e96a87 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1179,8 +1179,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1269,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 57f64c2f92..16afa51c74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
catch (Exception e)
{
- JMSException jmse = new JMSException("Exception when sending message");
+ JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 1c521244d0..06c5c83031 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- for (Session ssn : channels.values())
- {
- ssn.closeCode(close);
- }
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
{
@@ -705,4 +701,13 @@ public class Connection extends ConnectionInvoker
{
return sessions.containsKey(new Binary(name.getBytes()));
}
+
+ public void notifyFailoverRequired()
+ {
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
+ {
+ ssn.notifyFailoverRequired();
+ }
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index b732191707..321e5256b2 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Session
@@ -125,6 +126,8 @@ public class Session extends SessionInvoker
private SessionDetachCode detachCode;
private final Object stateLock = new Object();
+ private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -257,6 +260,7 @@ public class Session extends SessionInvoker
void resume()
{
+ _failoverRequired.set(false);
synchronized (commands)
{
attach();
@@ -459,7 +463,7 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED || state == CLOSING)
+ if (state == DETACHED || state == CLOSING || state == CLOSED)
{
return;
}
@@ -595,11 +599,12 @@ public class Session extends SessionInvoker
if (state != OPEN && state != CLOSED && state != CLOSING)
{
Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (!current.equals(resumer) )
{
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && (state != OPEN && state != CLOSED))
{
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -668,6 +673,7 @@ public class Session extends SessionInvoker
}
}
}
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -762,6 +768,14 @@ public class Session extends SessionInvoker
}
}
+ private void checkFailoverRequired(String message)
+ {
+ if (_failoverRequired.get())
+ {
+ throw new SessionException(message);
+ }
+ }
+
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
@@ -787,6 +801,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
+ checkFailoverRequired("Session sync was interrupted by failover.");
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -847,13 +862,6 @@ public class Session extends SessionInvoker
}
}
- private ConnectionClose close = null;
-
- void closeCode(ConnectionClose close)
- {
- this.close = close;
- }
-
ExecutionException getException()
{
synchronized (results)
@@ -904,6 +912,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
+ checkFailoverRequired("Operation was interrupted by failover.");
log.debug("%s waiting for result: %s", Session.this, this);
w.await();
}
@@ -915,7 +924,12 @@ public class Session extends SessionInvoker
}
else if (state == CLOSED)
{
- throw new SessionException(getException());
+ ExecutionException ex = getException();
+ if(ex == null)
+ {
+ throw new SessionClosedException();
+ }
+ throw new SessionException(ex);
}
else
{
@@ -995,6 +1009,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED)
{
+ checkFailoverRequired("close() was interrupted by failover.");
w.await();
}
@@ -1089,6 +1104,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(stateLock, timeout);
while (w.hasTime() && state == NEW)
{
+ checkFailoverRequired("Session opening was interrupted by failover.");
w.await();
}
}
@@ -1111,4 +1127,26 @@ public class Session extends SessionInvoker
{
return stateLock;
}
+
+ protected void notifyFailoverRequired()
+ {
+ //ensure any operations waiting are aborted to
+ //prevent them waiting for timeout for 60 seconds
+ //and possibly preventing failover proceeding
+ _failoverRequired.set(true);
+ synchronized (commands)
+ {
+ commands.notifyAll();
+ }
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
+ }
+ }
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
new file mode 100644
index 0000000000..980fc7285d
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.client.failover;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest
+{
+ @Override
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}");
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 66f8fe0546..a5b9c618bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.client.failover;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,12 +34,15 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
/**
@@ -96,6 +100,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
*/
private JMSException _exceptionListenerException;
+ /**
+ * Latch to check that failover mutex is hold by a failover thread
+ */
+ private CountDownLatch _failoverStarted;
+
@Override
protected void setUp() throws Exception
{
@@ -105,6 +114,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
_connection.setExceptionListener(this);
((AMQConnection) _connection).setConnectionListener(this);
_failoverComplete = new CountDownLatch(1);
+ _failoverStarted = new CountDownLatch(1);
}
/**
@@ -625,8 +635,134 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
+ public void testPublishAutoAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWhileFailover() throws Exception
+ {
+ Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
+ receivedMessage.acknowledge();
+ }
+
+ public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.SESSION_TRANSACTED);
+ _consumerSession.commit();
+ }
+
+ public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
+
+ }
+
+ public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.SESSION_TRANSACTED);
+ }
+
+ public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ failBroker(getFailingPort());
+
+ if(!_failoverStarted.await(5, TimeUnit.SECONDS))
+ {
+ fail("Did not receieve notification failover had started");
+ }
+
+ _producer.send(message);
+
+ if (_producerSession.getTransacted())
+ {
+ _producerSession.commit();
+ }
+
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+ return receivedMessage;
+ }
+
+ private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ AMQConnection connection = (AMQConnection)_connection;
+
+ // holding failover mutex should prevent the failover from
+ // proceeding before we try to send the message
+ synchronized(connection.getFailoverMutex())
+ {
+ failBroker(getFailingPort());
+
+ // wait to make sure that connection is lost
+ while(!connection.isFailingOver())
+ {
+ Thread.sleep(25l);
+ }
+
+ try
+ {
+ _producer.send(message);
+ fail("Sending should fail because connection was lost and failover has not yet completed");
+ }
+ catch(JMSException e)
+ {
+ // JMSException is expected
+ }
+ }
+ // wait for failover completion, thus ensuring it actually
+ //got started, before allowing the test to tear down
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ }
/**
- * Tests {@link Session#close()} for session with given acknowledge mode
+ * Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
*
* @param acknowledgeMode session acknowledge mode
@@ -671,7 +807,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
_consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ _destination = createDestination(_consumerSession);
_consumer = _consumerSession.createConsumer(_destination);
if (startConnection)
@@ -684,6 +820,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
}
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ }
+
/**
* Resends messages if reconnected to a non-clustered broker
*
@@ -879,6 +1020,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
@Override
public boolean preFailover(boolean redirect)
{
+ _failoverStarted.countDown();
return true;
}
@@ -900,6 +1042,39 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
_exceptionListenerException = e;
}
+ /**
+ * Causes 1 second delay before reconnect in order to test whether JMS
+ * methods block while failover is in progress
+ */
+ private static class DelayingFailoverPolicy extends FailoverPolicy
+ {
+
+ private CountDownLatch _suspendLatch;
+ private long _delay;
+
+ public DelayingFailoverPolicy(AMQConnection connection, long delay)
+ {
+ super(connection.getConnectionURL(), connection);
+ _suspendLatch = new CountDownLatch(1);
+ _delay = delay;
+ }
+
+ public void attainedConnection()
+ {
+ try
+ {
+ _suspendLatch.await(_delay, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // continue
+ }
+ super.attainedConnection();
+ }
+
+ }
+
+
private class FailoverTestMessageListener implements MessageListener
{
// message counter
@@ -946,4 +1121,101 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
return _counter.get();
}
}
+
+ /**
+ * Tests {@link Session#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ initDelayedFailover(acknowledgeMode);
+
+ // intentionally receive message but not commit or acknowledge it in
+ // case of transacted or CLIENT_ACK session
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ // test whether session#close blocks while failover is in progress
+ _consumerSession.close();
+
+ assertFailoverException();
+ }
+
+ /**
+ * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @return queue browser
+ * @throws JMSException
+ */
+ private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException
+ {
+ init(acknowledgeMode, false);
+ _consumer.close();
+ QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
+ _connection.start();
+
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return browser;
+ }
+
+ /**
+ * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ setDelayedFailoverPolicy();
+
+ QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
+
+ @SuppressWarnings("unchecked")
+ Enumeration<Message> messages = browser.getEnumeration();
+ Message receivedMessage = (Message) messages.nextElement();
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ browser.close();
+
+ assertFailoverException();
+ }
+
+ private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
+ {
+ DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
+ init(acknowledgeMode, true);
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return failoverPolicy;
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy()
+ {
+ return setDelayedFailoverPolicy(2);
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
+ {
+ AMQConnection amqConnection = (AMQConnection) _connection;
+ DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
+ ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
+ return failoverPolicy;
+ }
+
}
diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes
index 8344cd98c2..68feaf1e2b 100644
--- a/java/test-profiles/JavaPre010Excludes
+++ b/java/test-profiles/JavaPre010Excludes
@@ -28,6 +28,7 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange
// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
+org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy