diff options
| author | Keith Wall <kwall@apache.org> | 2014-05-30 16:24:58 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-05-30 16:24:58 +0000 |
| commit | df196f1d27e75b4f82a35e368406910be16c75c6 (patch) | |
| tree | 65564f8ac3f68153e2a601448a2ea5e2d14ac488 /qpid/java | |
| parent | 51c910379e88ec29a0dd6e5075af4af8441b497e (diff) | |
| download | qpid-python-df196f1d27e75b4f82a35e368406910be16c75c6.tar.gz | |
QPID-5795: [Java Broker] Prevent ConnectionAdapter leak when closing a messaging connection
The leak was due to the fact that nothing was telling the virtualhost to unregister the connection child (#unregisterChild)
when the connection was closed.
* Made ConnectionAdapter responsible for causing its own deletion (when the underlying connection is closed). The call to #deleted() causes the child
to be unregistered from its parent (preventing the leak)
* Removed the now unnecessary _connectionAdapters map from the VH. This needlessly duplicated information already held more generally by the ACO.
* Refactored SessionAdapter in sympathy with CA changes. SessionsAdapters where _not_ being leaked as the session implementation were already telling the model to delete.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1598658 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
5 files changed, 32 insertions, 111 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 306013a124..c6a2d46267 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -21,9 +21,7 @@ package org.apache.qpid.server.model.adapter; import java.security.Principal; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -40,22 +38,31 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.util.Action; public final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>, SessionModelListener { private AMQConnectionModel _connection; - private final Map<AMQSessionModel, SessionAdapter> _sessionAdapters = - new HashMap<AMQSessionModel, SessionAdapter>(); - private State _state = State.ACTIVE; public ConnectionAdapter(final AMQConnectionModel conn) { super(parentsMap(conn.getVirtualHost()),createAttributes(conn)); _connection = conn; + + conn.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + conn.removeDeleteTask(this); + deleted(); + } + }); open(); + conn.addSessionListener(this); } @@ -137,23 +144,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection public Collection<Session> getSessions() { - synchronized (_sessionAdapters) - { - return new ArrayList<Session>(_sessionAdapters.values()); - } - } - - /** - * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection. - * @param session the AMQSessionModel used to index the SessionAdapter. - * @return the requested SessionAdapter. - */ - SessionAdapter getSessionAdapter(AMQSessionModel session) - { - synchronized (_sessionAdapters) - { - return _sessionAdapters.get(session); - } + return getChildren(Session.class); } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) @@ -171,19 +162,6 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override - public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) - { - if(clazz == Session.class) - { - return (Collection<C>) getSessions(); - } - else - { - return Collections.emptySet(); - } - } - - @Override public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == Session.class) @@ -236,27 +214,13 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override public void sessionAdded(final AMQSessionModel<?, ?> session) { - synchronized (_sessionAdapters) - { - if(!_sessionAdapters.containsKey(session)) - { - SessionAdapter adapter = new SessionAdapter(this, session); - _sessionAdapters.put(session, adapter); - childAdded(adapter); - } - } + SessionAdapter adapter = new SessionAdapter(this, session); + childAdded(adapter); } @Override public void sessionRemoved(final AMQSessionModel<?, ?> session) { - synchronized (_sessionAdapters) - { - SessionAdapter adapter = _sessionAdapters.remove(session); - if(adapter != null) - { - childRemoved(adapter); - } - } + // SessionAdapter installs delete task to cause session model object to delete } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 325861e108..689166e951 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; +import org.apache.qpid.server.util.Action; final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter> { @@ -67,6 +68,16 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl } }); session.setModelObject(this); + session.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + session.removeDeleteTask(this); + deleted(); + } + }); + open(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 0509985482..d27cd1c13e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -123,9 +123,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final EventLogger _eventLogger; - private final Map<AMQConnectionModel, ConnectionAdapter> _connectionAdapters = - new HashMap<AMQConnectionModel, ConnectionAdapter>(); - private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>(); private final AtomicBoolean _deleted = new AtomicBoolean(); private final VirtualHostNode<?> _virtualHostNode; @@ -362,26 +359,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public Collection<Connection> getConnections() { - synchronized(_connectionAdapters) - { - return new ArrayList<Connection>(_connectionAdapters.values()); - } + return getChildren(Connection.class); } - /** - * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost. - * @param connection the AMQConnectionModel used to index the ConnectionAdapter. - * @return the requested ConnectionAdapter. - */ - ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection) - { - synchronized (_connectionAdapters) - { - return _connectionAdapters.get(connection); - } - } - @Override public State getState() { @@ -395,11 +376,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == Connection.class) - { - return (Collection<C>) getConnections(); - } - else if(clazz == VirtualHostAlias.class) + if(clazz == VirtualHostAlias.class) { return (Collection<C>) getAliases(); } @@ -815,41 +792,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { connection.block(); } - ConnectionAdapter adapter = null; - synchronized (_connectionAdapters) - { - if(!_connectionAdapters.containsKey(connection)) - { - adapter = new ConnectionAdapter(connection); - _connectionAdapters.put(connection, adapter); - } - - } - if(adapter != null) - { - childAdded(adapter); - } + Connection c = new ConnectionAdapter(connection); + childAdded(c); } public void connectionUnregistered(final AMQConnectionModel connection) { - ConnectionAdapter adapter; - synchronized (_connectionAdapters) - { - adapter = _connectionAdapters.remove(connection); - - } - - if(adapter != null) - { - // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any - // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters. - adapter.getSessions(); - - childRemoved(adapter); - } + // ConnectionAdapter installs delete task to cause connection model object to delete } public void event(final Event event) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 6dd4124258..793150f9bb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -299,7 +299,6 @@ public class ServerConnectionDelegate extends ServerDelegate stopAllSubscriptions(conn, dtc); Session ssn = conn.getSession(dtc.getChannel()); ((ServerSession)ssn).setClose(true); - ((ServerSession)ssn).getModelObject().delete(); super.sessionDetach(conn, dtc); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 1f108ec3e9..200be71187 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -739,10 +739,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _transaction.rollback(); - if(_modelObject != null) - { - _modelObject.delete(); - } try { |
