diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-11 01:00:39 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-11 01:00:39 +0000 |
| commit | 8f5fad40053d70fcca012fe58ffd054d0d9444c4 (patch) | |
| tree | 515e199a1eab1047ad35bca74e0fb74d2856605a | |
| parent | 2a2679ef8cb7f57d1cc167eba57de7032386d9ea (diff) | |
| download | qpid-python-8f5fad40053d70fcca012fe58ffd054d0d9444c4.tar.gz | |
QPID-1779 : Application of patches attached to JIRA. Should address connection close issues experienced on 0-8/9 branch
Excluded test from TCP runs as it is hardwired to InVM.
merged from trunk r764109
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-release@764129 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 158 insertions, 12 deletions
diff --git a/qpid/java/010ExcludeList b/qpid/java/010ExcludeList index 94102ce09c..936dd762d5 100644 --- a/qpid/java/010ExcludeList +++ b/qpid/java/010ExcludeList @@ -64,3 +64,6 @@ org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#* // This test currently does not pick up the runtime location of the nonVm queueBacking store. org.apache.qpid.test.unit.client.close.FlowToDiskBackingQueueDeleteTest#* +// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM +org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* + diff --git a/qpid/java/08ExcludeList-nonvm b/qpid/java/08ExcludeList-nonvm index a529a2fa87..c2f263c870 100644 --- a/qpid/java/08ExcludeList-nonvm +++ b/qpid/java/08ExcludeList-nonvm @@ -32,3 +32,5 @@ org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait // This test currently does not pick up the runtime location of the nonVm queueBacking store. org.apache.qpid.test.unit.client.close.FlowToDiskBackingQueueDeleteTest#* +// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM +org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5c48d73e43..c09b05bda8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -58,6 +58,7 @@ import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -924,7 +925,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!_closed.getAndSet(true)) { - doClose(sessions, timeout); + _closing.set(true); + try{ + doClose(sessions, timeout); + }finally{ + _closing.set(false); + } } } @@ -1318,7 +1324,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence - if (cause instanceof IOException) + if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { closer = !_closed.getAndSet(true); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 22f3b97ecf..9f208b67fb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -282,7 +282,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; - + protected Thread _dispatcherThread; /** Holds the message factory factory for this session. */ @@ -644,7 +644,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendClose(timeout); + // IF we are closing then send the close. + if (_connection.isClosing()) + { + sendClose(timeout); + } } catch (AMQException e) { @@ -1218,9 +1222,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // this is done so that we can produce to a temporary queue before we create a consumer result.setQueueName(result.getRoutingKey()); - createQueue(result.getAMQQueueName(), result.isAutoDelete(), + createQueue(result.getAMQQueueName(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - bindQueue(result.getAMQQueueName(), result.getRoutingKey(), + bindQueue(result.getAMQQueueName(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(), result); return result; } @@ -1682,11 +1686,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if (rawSelector != null) // ft.put("headers", rawSelector.getDataAsBytes()); // rawSelector is used by HeadersExchange and is not a JMS Selector - if (rawSelector != null) + if (rawSelector != null) { ft.addAll(rawSelector); } - + if (messageSelector != null) { ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); @@ -1936,13 +1940,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _dispatcher = new Dispatcher(); try { - _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); - + _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); + } catch(Exception e) { throw new Error("Error creating Dispatcher thread",e); - } + } _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); _dispatcherThread.setDaemon(true); _dispatcher.setConnectionStopped(initiallyStopped); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 2bb443a090..a5f5e5f5fa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -563,7 +563,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { try { - sendCancel(); + if (!_connection.isClosing()) + { + sendCancel(); + } } catch (AMQException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java index 7e119343a1..e6771e122c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -52,6 +52,13 @@ public abstract class Closeable protected final AtomicBoolean _closed = new AtomicBoolean(false); /** + * Are we in the process of closing. We have this distinction so we can + * still signal we are in the process of closing so other objects can tell + * the difference and tidy up. + */ + protected final AtomicBoolean _closing = new AtomicBoolean(false); + + /** * Checks if this is closed, and raises a JMSException if it is. * * @throws JMSException If this is closed. @@ -75,6 +82,17 @@ public abstract class Closeable } /** + * Checks if this is closis. + * + * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise. + */ + public boolean isClosing() + { + return _closing.get(); + } + + + /** * Closes this object. * * @throws JMSException If this cannot be closed for any reason. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java new file mode 100644 index 0000000000..1cb24919f0 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; + +public class CloseAfterConnectionFailureTest extends QpidTestCase implements ExceptionListener +{ + private int sessionCount = 0; + AMQConnection connection; + Session session; + MessageConsumer consumer; + private CountDownLatch _latch = new CountDownLatch(1); + + public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException, + InterruptedException, JMSException + { + String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='500',retries='3'',failover='nofailover'"; + + AMQConnectionURL url = new AMQConnectionURL(connectionString); + + try + { + //Start the connection so it will use the retries + connection = new AMQConnection(url, null); + + connection.setExceptionListener(this); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(session.createQueue(this.getName())); + + //Kill connection + TransportConnection.killAllVMBrokers(); + _latch.await(); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void onException(JMSException e) + { + System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + try + { + consumer.close(); + } + catch (JMSException jsme) + { + System.err.println("Consumer close failed with:" + jsme.getMessage()); + } + System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + try + { + //Note that if we actually do session.close() we will lock up as the session will never receive a frame + // from the + ((AMQSession)session).close(10); + } + catch (JMSException jsme) + { + System.err.println("Session close failed with:" + jsme.getMessage()); + } + System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + + try + { + connection.close(); + } + catch (JMSException jsme) + { + System.err.println("Session close failed with:" + jsme.getMessage()); + } + System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed()); + + _latch.countDown(); + } + +} |
