From b187311574bbf087f376256d237173b38a84fdbc Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 7 Apr 2015 11:30:50 +0000 Subject: QPID-5818: [Java Broker] Ensure that connection/session use a configuration thread to mutate the model on receipt of close from wire git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671810 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/model/adapter/ConnectionAdapter.java | 114 +++++++-------------- .../qpid/server/model/adapter/SessionAdapter.java | 2 +- 2 files changed, 39 insertions(+), 77 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index ef760ade6a..0b7be2c28a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; @@ -34,7 +35,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; -import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -54,8 +54,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject doDelete() { - final SettableFuture returnVal = SettableFuture.create(); - asyncClose().addListener( - new Runnable() - { - @Override - public void run() + if (_underlyingClosed.get()) + { + deleted(); + return Futures.immediateFuture(null); + } + else + { + final SettableFuture returnVal = SettableFuture.create(); + asyncCloseUnderlying().addListener( + new Runnable() { - try - { - deleted(); - setState(State.DELETED); - } - finally + @Override + public void run() { - returnVal.set(null); + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } } - } - }, getTaskExecutor().getExecutor() - ); - return returnVal; + }, getTaskExecutor().getExecutor() + ); + return returnVal; + } } @Override protected ListenableFuture beforeClose() { - _closing.set(true); + if (_underlyingClosed.get()) + { + return Futures.immediateFuture(null); + } + else + { - return asyncClose(); + return asyncCloseUnderlying(); + } } - private ListenableFuture asyncClose() + private ListenableFuture asyncCloseUnderlying() { final SettableFuture closeFuture = SettableFuture.create(); @@ -206,6 +219,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject impl public void performAction(final Object object) { session.removeDeleteTask(this); - deleted(); + deleteAsync(); } }); setState(State.ACTIVE); -- cgit v1.2.1