summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-05-30 16:24:58 +0000
committerKeith Wall <kwall@apache.org>2014-05-30 16:24:58 +0000
commitdf196f1d27e75b4f82a35e368406910be16c75c6 (patch)
tree65564f8ac3f68153e2a601448a2ea5e2d14ac488 /qpid/java
parent51c910379e88ec29a0dd6e5075af4af8441b497e (diff)
downloadqpid-python-df196f1d27e75b4f82a35e368406910be16c75c6.tar.gz
QPID-5795: [Java Broker] Prevent ConnectionAdapter leak when closing a messaging connection
The leak was due to the fact that nothing was telling the virtualhost to unregister the connection child (#unregisterChild) when the connection was closed. * Made ConnectionAdapter responsible for causing its own deletion (when the underlying connection is closed). The call to #deleted() causes the child to be unregistered from its parent (preventing the leak) * Removed the now unnecessary _connectionAdapters map from the VH. This needlessly duplicated information already held more generally by the ACO. * Refactored SessionAdapter in sympathy with CA changes. SessionsAdapters where _not_ being leaked as the session implementation were already telling the model to delete. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1598658 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java59
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java1
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java4
5 files changed, 32 insertions, 111 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 306013a124..c6a2d46267 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -21,9 +21,7 @@
package org.apache.qpid.server.model.adapter;
import java.security.Principal;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -40,22 +38,31 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.util.Action;
public final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>,
SessionModelListener
{
private AMQConnectionModel _connection;
- private final Map<AMQSessionModel, SessionAdapter> _sessionAdapters =
- new HashMap<AMQSessionModel, SessionAdapter>();
-
private State _state = State.ACTIVE;
public ConnectionAdapter(final AMQConnectionModel conn)
{
super(parentsMap(conn.getVirtualHost()),createAttributes(conn));
_connection = conn;
+
+ conn.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ conn.removeDeleteTask(this);
+ deleted();
+ }
+ });
open();
+
conn.addSessionListener(this);
}
@@ -137,23 +144,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
public Collection<Session> getSessions()
{
- synchronized (_sessionAdapters)
- {
- return new ArrayList<Session>(_sessionAdapters.values());
- }
- }
-
- /**
- * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection.
- * @param session the AMQSessionModel used to index the SessionAdapter.
- * @return the requested SessionAdapter.
- */
- SessionAdapter getSessionAdapter(AMQSessionModel session)
- {
- synchronized (_sessionAdapters)
- {
- return _sessionAdapters.get(session);
- }
+ return getChildren(Session.class);
}
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
@@ -171,19 +162,6 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
@Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
- {
- if(clazz == Session.class)
- {
- return (Collection<C>) getSessions();
- }
- else
- {
- return Collections.emptySet();
- }
- }
-
- @Override
public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
if(childClass == Session.class)
@@ -236,27 +214,13 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
@Override
public void sessionAdded(final AMQSessionModel<?, ?> session)
{
- synchronized (_sessionAdapters)
- {
- if(!_sessionAdapters.containsKey(session))
- {
- SessionAdapter adapter = new SessionAdapter(this, session);
- _sessionAdapters.put(session, adapter);
- childAdded(adapter);
- }
- }
+ SessionAdapter adapter = new SessionAdapter(this, session);
+ childAdded(adapter);
}
@Override
public void sessionRemoved(final AMQSessionModel<?, ?> session)
{
- synchronized (_sessionAdapters)
- {
- SessionAdapter adapter = _sessionAdapters.remove(session);
- if(adapter != null)
- {
- childRemoved(adapter);
- }
- }
+ // SessionAdapter installs delete task to cause session model object to delete
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 325861e108..689166e951 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.util.Action;
final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
{
@@ -67,6 +68,16 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
}
});
session.setModelObject(this);
+ session.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ session.removeDeleteTask(this);
+ deleted();
+ }
+ });
+
open();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0509985482..d27cd1c13e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -123,9 +123,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final EventLogger _eventLogger;
- private final Map<AMQConnectionModel, ConnectionAdapter> _connectionAdapters =
- new HashMap<AMQConnectionModel, ConnectionAdapter>();
-
private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>();
private final AtomicBoolean _deleted = new AtomicBoolean();
private final VirtualHostNode<?> _virtualHostNode;
@@ -362,26 +359,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public Collection<Connection> getConnections()
{
- synchronized(_connectionAdapters)
- {
- return new ArrayList<Connection>(_connectionAdapters.values());
- }
+ return getChildren(Connection.class);
}
- /**
- * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost.
- * @param connection the AMQConnectionModel used to index the ConnectionAdapter.
- * @return the requested ConnectionAdapter.
- */
- ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection)
- {
- synchronized (_connectionAdapters)
- {
- return _connectionAdapters.get(connection);
- }
- }
-
@Override
public State getState()
{
@@ -395,11 +376,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Connection.class)
- {
- return (Collection<C>) getConnections();
- }
- else if(clazz == VirtualHostAlias.class)
+ if(clazz == VirtualHostAlias.class)
{
return (Collection<C>) getAliases();
}
@@ -815,41 +792,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
connection.block();
}
- ConnectionAdapter adapter = null;
- synchronized (_connectionAdapters)
- {
- if(!_connectionAdapters.containsKey(connection))
- {
- adapter = new ConnectionAdapter(connection);
- _connectionAdapters.put(connection, adapter);
- }
-
- }
- if(adapter != null)
- {
- childAdded(adapter);
- }
+ Connection c = new ConnectionAdapter(connection);
+ childAdded(c);
}
public void connectionUnregistered(final AMQConnectionModel connection)
{
- ConnectionAdapter adapter;
- synchronized (_connectionAdapters)
- {
- adapter = _connectionAdapters.remove(connection);
-
- }
-
- if(adapter != null)
- {
- // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any
- // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters.
- adapter.getSessions();
-
- childRemoved(adapter);
- }
+ // ConnectionAdapter installs delete task to cause connection model object to delete
}
public void event(final Event event)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 6dd4124258..793150f9bb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -299,7 +299,6 @@ public class ServerConnectionDelegate extends ServerDelegate
stopAllSubscriptions(conn, dtc);
Session ssn = conn.getSession(dtc.getChannel());
((ServerSession)ssn).setClose(true);
- ((ServerSession)ssn).getModelObject().delete();
super.sessionDetach(conn, dtc);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 1f108ec3e9..200be71187 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -739,10 +739,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_transaction.rollback();
- if(_modelObject != null)
- {
- _modelObject.delete();
- }
try
{