diff options
| author | Fraser Adams <fadams@apache.org> | 2013-04-26 14:25:52 +0000 |
|---|---|---|
| committer | Fraser Adams <fadams@apache.org> | 2013-04-26 14:25:52 +0000 |
| commit | c018fd02dcfb0b5893eea6541b7e59799e3bc912 (patch) | |
| tree | a45b1c69ec3c7fb883c041c21b5746cb9a3093cc /java/broker | |
| parent | 107182b91c0f1871e41563236d55827412fe9d02 (diff) | |
| download | qpid-python-c018fd02dcfb0b5893eea6541b7e59799e3bc912.tar.gz | |
QPID-4760: Associate Java Broker QueueAdapter and SessionAdapter via ConsumerAdapter
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1476219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
5 files changed, 134 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 84f99e1f17..2ecd9a6431 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -68,20 +68,37 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection { if(!actualSessions.contains(session)) { - _sessionAdapters.remove(session); + SessionAdapter adapter = _sessionAdapters.remove(session); + childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback. } } for(AMQSessionModel session : actualSessions) { if(!_sessionAdapters.containsKey(session)) { - _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor())); + SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); + _sessionAdapters.put(session, adapter); + childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback. } } return new ArrayList<Session>(_sessionAdapters.values()); } } + /** + * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection. + * @param session the AMQSessionModel used to index the SessionAdapter. + * @return the requested SessionAdapter. + */ + SessionAdapter getSessionAdapter(AMQSessionModel session) + { + synchronized (_sessionAdapters) + { + getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions. + return _sessionAdapters.get(session); + } + } + public void delete() { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index e6d3fab2f8..4633605256 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -37,9 +37,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer { private final Subscription _subscription; private final QueueAdapter _queue; + private final SessionAdapter _session; private final ConsumerStatistics _statistics; - public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription) + public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter, + final Subscription subscription) { super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(), queueAdapter.getName(), @@ -48,6 +50,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer subscription.getConsumerName()), queueAdapter.getTaskExecutor()); _subscription = subscription; _queue = queueAdapter; + _session = sessionAdapter; _statistics = new ConsumerStatistics(); //TODO } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index f3ddf32e5a..8ac869900c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -34,13 +34,17 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFinder; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.MapValueConverter; @@ -91,6 +95,38 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs _queue.setNotificationListener(this); } + /** + * Helper method to retrieve the SessionAdapter keyed by the AMQSessionModel. + * This method first finds the ConnectionAdapter associated with the Session from this QueueAdapter's parent vhost + * then it does a lookup on that to find the SessionAdapter keyed by the requested AMQSessionModel instance. + * @param session the AMQSessionModel used to index the SessionAdapter. + * @return the requested SessionAdapter or null if it can't be found. + */ + private SessionAdapter getSessionAdapter(AMQSessionModel session) + { + // Retrieve the ConnectionModel associated with the SessionModel as a key to lookup the ConnectionAdapter. + AMQConnectionModel connectionKey = session.getConnectionModel(); + + // Lookup the ConnectionAdapter, from which we should be able to retrieve the SessionAdapter we really want. + ConnectionAdapter connectionAdapter = _vhost.getConnectionAdapter(connectionKey); + if (connectionAdapter == null) + { + return null; // If we can't find an associated ConnectionAdapter the SessionAdapter is a lost cause. + } + else + { // With a good ConnectionAdapter we can finally try to find the SessionAdapter we are actually looking for. + SessionAdapter sessionAdapter = connectionAdapter.getSessionAdapter(session); + if (sessionAdapter == null) + { + return null; // If the SessionAdapter isn't associated with the selected ConnectionAdapter give up. + } + else + { + return sessionAdapter; + } + } + } + private void populateConsumers() { Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers(); @@ -102,7 +138,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { if(!_consumerAdapters.containsKey(subscription)) { - _consumerAdapters.put(subscription, new ConsumerAdapter(this, subscription)); + SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription); + _consumerAdapters.put(subscription, adapter); + if (sessionAdapter != null) + { // Register ConsumerAdapter with the SessionAdapter. + sessionAdapter.subscriptionRegistered(subscription, adapter); + } } } } @@ -571,9 +613,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { if(!_consumerAdapters.containsKey(subscription)) { - adapter = new ConsumerAdapter(this, subscription); - _consumerAdapters.put(subscription,adapter); - // TODO - register with session + SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + adapter = new ConsumerAdapter(this, sessionAdapter, subscription); + _consumerAdapters.put(subscription, adapter); + if (sessionAdapter != null) + { // Register ConsumerAdapter with the SessionAdapter. + sessionAdapter.subscriptionRegistered(subscription, adapter); + } } } if(adapter != null) @@ -589,10 +635,14 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs synchronized (_consumerAdapters) { adapter = _consumerAdapters.remove(subscription); - // TODO - register with session } if(adapter != null) { + SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + if (sessionAdapter != null) + { // Unregister ConsumerAdapter with the SessionAdapter. + sessionAdapter.subscriptionUnregistered(subscription); + } childRemoved(adapter); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 2fffdb32f8..550e8cecd6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -21,8 +21,10 @@ package org.apache.qpid.server.model.adapter; import java.security.AccessControlException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -34,6 +36,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -44,6 +47,7 @@ final class SessionAdapter extends AbstractAdapter implements Session private AMQSessionModel _session; private SessionStatistics _statistics; + private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>(); public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor) { @@ -54,7 +58,10 @@ final class SessionAdapter extends AbstractAdapter implements Session public Collection<Consumer> getSubscriptions() { - return null; //TODO + synchronized (_consumerAdapters) + { + return new ArrayList<Consumer>(_consumerAdapters.values()); + } } public Collection<Publisher> getPublishers() @@ -111,6 +118,37 @@ final class SessionAdapter extends AbstractAdapter implements Session return 0; //TODO } + /** + * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription. + * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter. + * @param adapter the registered ConsumerAdapter. + */ + void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter) + { + synchronized (_consumerAdapters) + { + _consumerAdapters.put(subscription, adapter); + } + childAdded(adapter); + } + + /** + * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription. + * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter. + */ + void subscriptionUnregistered(Subscription subscription) + { + ConsumerAdapter adapter = null; + synchronized (_consumerAdapters) + { + adapter = _consumerAdapters.remove(subscription); + } + if (adapter != null) + { + childRemoved(adapter); + } + } + @Override public Collection<String> getAttributeNames() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 2a66763272..97bb492484 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -214,6 +214,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } + /** + * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost. + * @param connection the AMQConnectionModel used to index the ConnectionAdapter. + * @return the requested ConnectionAdapter. + */ + ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection) + { + synchronized (_connectionAdapters) + { + return _connectionAdapters.get(connection); + } + } + public Collection<Queue> getQueues() { synchronized(_queueAdapters) @@ -644,6 +657,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(adapter != null) { + // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any + // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters. + adapter.getSessions(); + childRemoved(adapter); } } |
