From df196f1d27e75b4f82a35e368406910be16c75c6 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 30 May 2014 16:24:58 +0000 Subject: QPID-5795: [Java Broker] Prevent ConnectionAdapter leak when closing a messaging connection The leak was due to the fact that nothing was telling the virtualhost to unregister the connection child (#unregisterChild) when the connection was closed. * Made ConnectionAdapter responsible for causing its own deletion (when the underlying connection is closed). The call to #deleted() causes the child to be unregistered from its parent (preventing the leak) * Removed the now unnecessary _connectionAdapters map from the VH. This needlessly duplicated information already held more generally by the ACO. * Refactored SessionAdapter in sympathy with CA changes. SessionsAdapters where _not_ being leaked as the session implementation were already telling the model to delete. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1598658 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/model/adapter/ConnectionAdapter.java | 68 +++++----------------- .../qpid/server/model/adapter/SessionAdapter.java | 11 ++++ .../server/virtualhost/AbstractVirtualHost.java | 59 ++----------------- .../protocol/v0_10/ServerConnectionDelegate.java | 1 - .../qpid/server/protocol/v0_8/AMQChannel.java | 4 -- 5 files changed, 32 insertions(+), 111 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 306013a124..c6a2d46267 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -21,9 +21,7 @@ package org.apache.qpid.server.model.adapter; import java.security.Principal; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -40,22 +38,31 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.util.Action; public final class ConnectionAdapter extends AbstractConfiguredObject implements Connection, SessionModelListener { private AMQConnectionModel _connection; - private final Map _sessionAdapters = - new HashMap(); - private State _state = State.ACTIVE; public ConnectionAdapter(final AMQConnectionModel conn) { super(parentsMap(conn.getVirtualHost()),createAttributes(conn)); _connection = conn; + + conn.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + conn.removeDeleteTask(this); + deleted(); + } + }); open(); + conn.addSessionListener(this); } @@ -137,23 +144,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject getSessions() { - synchronized (_sessionAdapters) - { - return new ArrayList(_sessionAdapters.values()); - } - } - - /** - * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection. - * @param session the AMQSessionModel used to index the SessionAdapter. - * @return the requested SessionAdapter. - */ - SessionAdapter getSessionAdapter(AMQSessionModel session) - { - synchronized (_sessionAdapters) - { - return _sessionAdapters.get(session); - } + return getChildren(Session.class); } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) @@ -170,19 +161,6 @@ public final class ConnectionAdapter extends AbstractConfiguredObject Collection getChildren(Class clazz) - { - if(clazz == Session.class) - { - return (Collection) getSessions(); - } - else - { - return Collections.emptySet(); - } - } - @Override public C addChild(Class childClass, Map attributes, ConfiguredObject... otherParents) { @@ -236,27 +214,13 @@ public final class ConnectionAdapter extends AbstractConfiguredObject session) { - synchronized (_sessionAdapters) - { - if(!_sessionAdapters.containsKey(session)) - { - SessionAdapter adapter = new SessionAdapter(this, session); - _sessionAdapters.put(session, adapter); - childAdded(adapter); - } - } + SessionAdapter adapter = new SessionAdapter(this, session); + childAdded(adapter); } @Override public void sessionRemoved(final AMQSessionModel session) { - synchronized (_sessionAdapters) - { - SessionAdapter adapter = _sessionAdapters.remove(session); - if(adapter != null) - { - childRemoved(adapter); - } - } + // SessionAdapter installs delete task to cause session model object to delete } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 325861e108..689166e951 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; +import org.apache.qpid.server.util.Action; final class SessionAdapter extends AbstractConfiguredObject implements Session { @@ -67,6 +68,16 @@ final class SessionAdapter extends AbstractConfiguredObject impl } }); session.setModelObject(this); + session.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + session.removeDeleteTask(this); + deleted(); + } + }); + open(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 0509985482..d27cd1c13e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -123,9 +123,6 @@ public abstract class AbstractVirtualHost> exte private final EventLogger _eventLogger; - private final Map _connectionAdapters = - new HashMap(); - private final List _aliases = new ArrayList(); private final AtomicBoolean _deleted = new AtomicBoolean(); private final VirtualHostNode _virtualHostNode; @@ -362,26 +359,10 @@ public abstract class AbstractVirtualHost> exte public Collection getConnections() { - synchronized(_connectionAdapters) - { - return new ArrayList(_connectionAdapters.values()); - } + return getChildren(Connection.class); } - /** - * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost. - * @param connection the AMQConnectionModel used to index the ConnectionAdapter. - * @return the requested ConnectionAdapter. - */ - ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection) - { - synchronized (_connectionAdapters) - { - return _connectionAdapters.get(connection); - } - } - @Override public State getState() { @@ -395,11 +376,7 @@ public abstract class AbstractVirtualHost> exte @Override public Collection getChildren(Class clazz) { - if(clazz == Connection.class) - { - return (Collection) getConnections(); - } - else if(clazz == VirtualHostAlias.class) + if(clazz == VirtualHostAlias.class) { return (Collection) getAliases(); } @@ -815,41 +792,15 @@ public abstract class AbstractVirtualHost> exte { connection.block(); } - ConnectionAdapter adapter = null; - synchronized (_connectionAdapters) - { - if(!_connectionAdapters.containsKey(connection)) - { - adapter = new ConnectionAdapter(connection); - _connectionAdapters.put(connection, adapter); - } - - } - if(adapter != null) - { - childAdded(adapter); - } + Connection c = new ConnectionAdapter(connection); + childAdded(c); } public void connectionUnregistered(final AMQConnectionModel connection) { - ConnectionAdapter adapter; - synchronized (_connectionAdapters) - { - adapter = _connectionAdapters.remove(connection); - - } - - if(adapter != null) - { - // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any - // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters. - adapter.getSessions(); - - childRemoved(adapter); - } + // ConnectionAdapter installs delete task to cause connection model object to delete } public void event(final Event event) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 6dd4124258..793150f9bb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -299,7 +299,6 @@ public class ServerConnectionDelegate extends ServerDelegate stopAllSubscriptions(conn, dtc); Session ssn = conn.getSession(dtc.getChannel()); ((ServerSession)ssn).setClose(true); - ((ServerSession)ssn).getModelObject().delete(); super.sessionDetach(conn, dtc); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 1f108ec3e9..200be71187 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -739,10 +739,6 @@ public class AMQChannel> _transaction.rollback(); - if(_modelObject != null) - { - _modelObject.delete(); - } try { -- cgit v1.2.1