summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-19 15:24:27 +0000
committerKeith Wall <kwall@apache.org>2015-02-19 15:24:27 +0000
commitdb70f1d2908f294fee0ed47cdb478c3ab0f3b252 (patch)
treebf08922f63b255a26182700f386bab4406db631d /qpid/java/broker-core
parentc1926054f005af5084e46e6bf8da0c30120c82b4 (diff)
downloadqpid-python-db70f1d2908f294fee0ed47cdb478c3ab0f3b252.tar.gz
Connection close is now performed by i/o thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java86
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java4
13 files changed, 142 insertions, 33 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 883785c7ce..a24195075e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -74,7 +74,7 @@ public class ConnectionRegistry implements IConnectionRegistry
AMQConnectionModel connection = itr.next();
try
{
- connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+ connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText);
}
catch (Exception e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index b9a4b32acb..baf465f6d1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -490,16 +490,37 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
- beforeClose();
- closeChildren();
- onClose();
- unregister(false);
+ CloseFuture close = beforeClose();
+
+ Runnable closeRunnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ closeChildren();
+ onClose();
+ unregister(false);
+
+ }
+ };
+
+ if (close == null)
+ {
+ closeRunnable.run();
+ }
+ else
+ {
+ close.runWhenComplete(closeRunnable);
+ }
+
+ // if future not complete, schedule the remainder to be done once complete.
}
}
- protected void beforeClose()
+ protected CloseFuture beforeClose()
{
+ return null;
}
protected void onClose()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
new file mode 100644
index 0000000000..5e9d794e14
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.model;
+
+
+public interface CloseFuture
+{
+ public void runWhenComplete(final Runnable closeRunnable);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 62f0e6ae06..64cfc39e1a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -31,6 +31,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.CloseFuture;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
@@ -51,6 +52,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Action _underlyingConnectionDeleteTask;
private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
private AMQConnectionModel _underlyingConnection;
+ private final AtomicBoolean _closing = new AtomicBoolean();
public ConnectionAdapter(final AMQConnectionModel conn)
{
@@ -158,15 +160,42 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
private void doDelete()
{
- closeUnderlyingConnection();
+ asyncClose();
deleted();
setState(State.DELETED);
}
@Override
+ protected CloseFuture beforeClose()
+ {
+ _closing.set(true);
+
+ final ConnectionCloseFuture closeFuture = asyncClose();
+
+ return closeFuture;
+ }
+
+ private ConnectionCloseFuture asyncClose()
+ {
+ final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture();
+
+ _underlyingConnection.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ LOGGER.debug("KWDEBUG underlying connection deleted");
+ closeFuture.connectionClosed();
+ }
+ });
+
+ _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ return closeFuture;
+ }
+
+ @Override
protected void onClose()
{
- closeUnderlyingConnection();
}
@Override
@@ -233,23 +262,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
// SessionAdapter installs delete task to cause session model object to delete
}
- private void closeUnderlyingConnection()
+
+ private static class ConnectionCloseFuture implements CloseFuture
{
- if (_underlyingClosed.compareAndSet(false, true))
+ private boolean _closed;
+
+ public synchronized void connectionClosed()
{
- _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
- try
+ _closed = true;
+ notifyAll();
+
+ }
+
+ @Override
+ public void runWhenComplete(final Runnable closeRunnable)
+ {
+ if (_closed )
{
- _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ closeRunnable.run();
}
- catch (Exception e)
+ else
{
- LOGGER.warn("Exception closing connection "
- + _underlyingConnection.getConnectionId()
- + " from "
- + _underlyingConnection.getRemoteAddressString(), e);
- }
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionCloseFuture.this)
+ {
+ while (!_closed)
+ {
+ try
+ {
+ ConnectionCloseFuture.this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ closeRunnable.run();
+ }
+ }
+ });
+
+ t.setDaemon(true);
+ t.start();
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 353dcd98d6..50cfba5bec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -40,7 +40,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
* @param cause
* @param message
*/
- public void close(AMQConstant cause, String message);
+ public void closeAsync(AMQConstant cause, String message);
public void block();
@@ -110,4 +110,5 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
void notifyWork();
boolean isMessageAssignmentSuspended();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 759f5b8eb7..2ccf595c26 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -214,9 +214,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
- _delegate.processPendingMessages();
+ _delegate.processPending();
}
@Override
@@ -260,7 +260,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
@@ -418,7 +418,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
index 7a0f43d74d..eba1f78ad0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
@@ -42,7 +42,7 @@ public interface ServerProtocolEngine extends ProtocolEngine
boolean isMessageAssignmentSuspended();
- void processPendingMessages();
+ void processPending();
boolean hasWork();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 65e8a1358d..60999fb2be 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -756,10 +756,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- protected void beforeClose()
+ protected org.apache.qpid.server.model.CloseFuture beforeClose()
{
_closing = true;
- super.beforeClose();
+ return super.beforeClose();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 12ce46eedb..3a32ddd632 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -287,7 +287,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
_protocolEngine.setMessageAssignmentSuspended(true);
- _protocolEngine.processPendingMessages();
+ _protocolEngine.processPending();
_protocolEngine.setTransportBlockedForWriting(!doWrite());
boolean dataRead = doRead();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 4086a67aae..dce902b126 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -677,9 +677,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- protected void beforeClose()
+ protected CloseFuture beforeClose()
{
setState(State.UNAVAILABLE);
+ return null;
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index caba0bd1d8..1c7cf9d566 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -538,7 +538,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void close(AMQConstant cause, String message)
+ public void closeAsync(AMQConstant cause, String message)
{
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
index 65f9b4b148..e62a16fdec 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
@@ -55,7 +55,7 @@ public abstract class BaseConnectionActorTestCase extends BaseActorTestCase
}
if (_session != null)
{
- _session.close(AMQConstant.CONNECTION_FORCED, "");
+ _session.closeAsync(AMQConstant.CONNECTION_FORCED, "");
}
}
finally
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index ba6b0d95f3..4485d5cc85 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -251,7 +251,7 @@ public class VirtualHostTest extends QpidTestCase
0,
virtualHost.getChildren(Connection.class).size());
- verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
}
public void testDeleteVirtualHost_ClosesConnections()
@@ -276,7 +276,7 @@ public class VirtualHostTest extends QpidTestCase
0,
virtualHost.getChildren(Connection.class).size());
- verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
}
public void testCreateDurableQueue()