summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-28 00:45:32 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-28 00:45:32 +0000
commit0bda021568dda5c6d5a66de72f7b79567fbd40d9 (patch)
treea74fa22f2101625f30f788ced21ca0fd7d021182 /java/client
parent933a38ba979882a96bfa5f9e7db28527c7feaf04 (diff)
downloadqpid-python-0bda021568dda5c6d5a66de72f7b79567fbd40d9.tar.gz
QPID-679 : Patch provided by Aidan Skinner and additional from odd problems during test runs.
AMQChannel - Catch and log AMQException occuring when requeue()-ing. Previously exceptions wouldn't be caught at all. The requeue() is called during closure so there is nothing we can do protocol wise on error other than log the issue and continue with any other shutdown that is needed. AMQMinaProtocolSession & AMQPFastProtocolHandler . Additions to catch and log AMQExceptions. Changes to AMQMinaProtocolSession were done to ignore all input on a closing session other than the close-ok. Previously only Protocol frames were ignored this resulted in Content*Body-s still being processed. Additional checks were made for the MessageStoreClosedException to log and continue. As said else were we need to seperate protocol exceptoions(AMQException) from internal code exception handling. Further All AMQExceptions occuring in the frameReceived method are now caught and logged. Allowing them to propogate higher will only result in thread death. AMQPFastProtocolHandler Caught AMQExceptions occuring whilst closing the session. Again allowing these to continue will result in thread death. There is not a lot that can be done other than log the problem as the session is already closed by this point. Prevented the stacktrace associated with a session exception being printed in the exceptionCaught method when the problem was an IO Exception. This doesn't add anything useful and only adds to the log file sizes. ApplicationRegistry - Added removeAll option which ensures that all ARs are correctly purged so that we can attempt to clean up between Unit Tests. MemoryMessageStore - This was causing us real problems during the failover testing. Similar checks should probably be made to any other Message Store Impl. The issue was that when shutting down the broker the MS.close() method is called this sets all the storage to null. However, there may still be message processing going on as the close() does not attempt to stop connection processing. Hence we now check to see if the Store is close throwing a MSClosedException if required. This prevents NPEs that have been seen during Unit failover testing. In fact the close() is called as a request to shutdown the ApplicationRegistry, but this only occurs from tests and broker shutdown, no attempt to unbind or prevent further connections during this period is yet done. CLIENT CHANGES AMQConnection - Added method to check if failover is in progress. AMQClient - Upgraded acknowledge() exception to JMSException for errors due to failover. Also , added call to update consumers as a result of failover. BasicMessageConsumer - Changes to acquireReceiving to take in to consideration blocking for failover to occur. wrt receiveNoWait.. which previously blocked for failover to complete... not exactly noWait. acknowledge will now TransportConnection - Update to ensure all inVM brokers are correctly killed. FailoverTest - QPID-679 - Finder of all the above problems. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@598834 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java21
4 files changed, 85 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 9abc94b3df..cbb98a2dd5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1286,4 +1286,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.get(channelId);
}
+
+ public boolean isFailingOver()
+ {
+ return (_protocolHandler.getFailoverLatch() != null);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fd795392ee..a052b48426 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -420,7 +420,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @throws IllegalStateException If the session is closed.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws JMSException
{
if (isClosed())
{
@@ -2510,6 +2510,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator it = consumers.iterator(); it.hasNext();)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ consumer.failedOver();
registerConsumer(consumer, true);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 773401d03a..44c10afcf5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -277,8 +275,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session.setInRecovery(false);
}
- private void acquireReceiving() throws JMSException
+ /**
+ * @param immediate if true then return immediately if the connection is failing over
+ *
+ * @return boolean if the acquisition was successful
+ *
+ * @throws JMSException
+ * @throws InterruptedException
+ */
+ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
+ if (_connection.isFailingOver())
+ {
+ if (immediate)
+ {
+ return false;
+ }
+ else
+ {
+ _connection.blockUntilNotFailingOver();
+ }
+ }
+
if (!_receiving.compareAndSet(false, true))
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
@@ -290,6 +308,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
_receivingThread = Thread.currentThread();
+ return true;
}
private void releaseReceiving()
@@ -343,7 +362,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ acquireReceiving(false);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
_session.startDistpatcherIfNecessary();
@@ -424,7 +454,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ if (!acquireReceiving(true))
+ {
+ //If we couldn't acquire the receiving thread then return null.
+ // This will occur if failing over.
+ return null;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ /*
+ * This seems slightly shoddy but should never actually be executed
+ * since we told acquireReceiving to return immediately and it shouldn't
+ * block on anything.
+ */
+
+ return null;
+ }
_session.startDistpatcherIfNecessary();
@@ -868,11 +916,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- public void acknowledge() // throws JMSException
+ public void acknowledge() throws JMSException
{
- if (!isClosed())
+ if (isClosed())
+ {
+ throw new IllegalStateException("Consumer is closed");
+ }
+ else if (_session.hasFailedOver())
+ {
+ throw new JMSException("has failed over");
+ }
+ else
{
-
Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
while (tags.hasNext())
{
@@ -880,10 +935,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
tags.remove();
}
}
- else
- {
- throw new IllegalStateException("Consumer is closed");
- }
}
/** Called on recovery to reset the list of delivery tags */
@@ -1022,4 +1073,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_synchronousQueue.clear();
}
+
+ /** to be called when a failover has occured */
+ public void failedOver()
+ {
+ clearReceiveQueue();
+ clearUnackedMessages();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 040b5f7b68..49d528a4f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -99,8 +98,8 @@ public class TransportConnection
if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
{
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
- ? "Qpid NIO is new default"
- : "Sysproperty 'qpidnio' is set"));
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
result = new MultiThreadSocketConnector();
}
else
@@ -277,8 +276,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
- amqbce.initCause(e);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
throw amqbce;
}
@@ -291,14 +289,11 @@ public class TransportConnection
_acceptor.unbindAll();
synchronized (_inVmPipeAddress)
{
- Iterator keys = _inVmPipeAddress.keySet().iterator();
-
- while (keys.hasNext())
- {
- int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
- }
- }
+ _inVmPipeAddress.clear();
+ }
+ _acceptor = null;
+ _currentInstance = -1;
+ _currentVMPort = -1;
}
public static void killVMBroker(int port)