summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java12
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java23
2 files changed, 28 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 7fa7004a9e..cb6c196a45 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -147,8 +147,16 @@ public class FailoverHandler implements Runnable
// So lets make a new one.
_amqProtocolHandler.setStateManager(new AMQStateManager());
- // Close the session, false says don't wait for it to close, just close it.
- _amqProtocolHandler.getProtocolSession().closeProtocolSession(false);
+ // Close the session, we need to wait for it to close as there may have
+ // been data in transit such as an ack that is still valid to send.
+ //
+ // While we are allowing data to continue to be written to the
+ // socket assuming the connection is still valid, we do not consider
+ // the possibility that the problem that triggered failover was
+ // entirely client side. In that situation the socket will still be
+ // open and the we should really send a ConnectionClose to be AMQP
+ // compliant.
+ _amqProtocolHandler.getProtocolSession().closeProtocolSession();
// Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
index 978b7f1c22..2d688bcc51 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -179,9 +179,16 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem
messages.remove(0).getIntProperty("count"),
received.getIntProperty("count"));
- // Allow ack to be sent to broker, by performing a synchronous command
- // along the session.
-// _session.createConsumer(_session.createTemporaryQueue()).close();
+ // When the Exception is received by the underlying IO layer it will
+ // initiate failover. The first step of which is to ensure that the
+ // existing conection is closed. So in this situation the connection
+ // will be flushed casuing the above ACK to be sent to the broker.
+ //
+ // That said:
+ // when the socket close is detected on the server it will rise up the
+ // Mina filter chain and interrupt processing.
+ // this has been raised as QPID-2138
+ _session.createConsumer(_session.createTemporaryQueue()).close();
//Retain IO Layer
AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
@@ -261,8 +268,14 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem
private void initialiseConnection()
throws Exception
{
- //Create Connection
- _connection = (AMQConnection) getConnection();
+ //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default
+ _connection = (AMQConnection) getConnection(getConnectionFactory("default").getConnectionURL());
+ // The default connection does not have any retries configured so
+ // Allow this connection to retry so that we can block on the failover.
+ // The alternative would be to use the getConnection() default. However,
+ // this would add additional complexity in the logging as a second
+ // broker is defined in that url. We do not need it for this test.
+ _connection.getFailoverPolicy().getCurrentMethod().setRetries(1);
_connection.setConnectionListener(this);
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);