summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorFraser Adams <fadams@apache.org>2013-04-26 14:25:52 +0000
committerFraser Adams <fadams@apache.org>2013-04-26 14:25:52 +0000
commitc018fd02dcfb0b5893eea6541b7e59799e3bc912 (patch)
treea45b1c69ec3c7fb883c041c21b5746cb9a3093cc /java/broker
parent107182b91c0f1871e41563236d55827412fe9d02 (diff)
downloadqpid-python-c018fd02dcfb0b5893eea6541b7e59799e3bc912.tar.gz
QPID-4760: Associate Java Broker QueueAdapter and SessionAdapter via ConsumerAdapter
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1476219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java60
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java17
5 files changed, 134 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 84f99e1f17..2ecd9a6431 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -68,20 +68,37 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
{
if(!actualSessions.contains(session))
{
- _sessionAdapters.remove(session);
+ SessionAdapter adapter = _sessionAdapters.remove(session);
+ childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback.
}
}
for(AMQSessionModel session : actualSessions)
{
if(!_sessionAdapters.containsKey(session))
{
- _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor()));
+ SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor());
+ _sessionAdapters.put(session, adapter);
+ childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback.
}
}
return new ArrayList<Session>(_sessionAdapters.values());
}
}
+ /**
+ * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter.
+ */
+ SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ synchronized (_sessionAdapters)
+ {
+ getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions.
+ return _sessionAdapters.get(session);
+ }
+ }
+
public void delete()
{
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
index e6d3fab2f8..4633605256 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
@@ -37,9 +37,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
{
private final Subscription _subscription;
private final QueueAdapter _queue;
+ private final SessionAdapter _session;
private final ConsumerStatistics _statistics;
- public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription)
+ public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
+ final Subscription subscription)
{
super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
queueAdapter.getName(),
@@ -48,6 +50,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
subscription.getConsumerName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
+ _session = sessionAdapter;
_statistics = new ConsumerStatistics();
//TODO
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index f3ddf32e5a..8ac869900c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -34,13 +34,17 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.MapValueConverter;
@@ -91,6 +95,38 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
_queue.setNotificationListener(this);
}
+ /**
+ * Helper method to retrieve the SessionAdapter keyed by the AMQSessionModel.
+ * This method first finds the ConnectionAdapter associated with the Session from this QueueAdapter's parent vhost
+ * then it does a lookup on that to find the SessionAdapter keyed by the requested AMQSessionModel instance.
+ * @param session the AMQSessionModel used to index the SessionAdapter.
+ * @return the requested SessionAdapter or null if it can't be found.
+ */
+ private SessionAdapter getSessionAdapter(AMQSessionModel session)
+ {
+ // Retrieve the ConnectionModel associated with the SessionModel as a key to lookup the ConnectionAdapter.
+ AMQConnectionModel connectionKey = session.getConnectionModel();
+
+ // Lookup the ConnectionAdapter, from which we should be able to retrieve the SessionAdapter we really want.
+ ConnectionAdapter connectionAdapter = _vhost.getConnectionAdapter(connectionKey);
+ if (connectionAdapter == null)
+ {
+ return null; // If we can't find an associated ConnectionAdapter the SessionAdapter is a lost cause.
+ }
+ else
+ { // With a good ConnectionAdapter we can finally try to find the SessionAdapter we are actually looking for.
+ SessionAdapter sessionAdapter = connectionAdapter.getSessionAdapter(session);
+ if (sessionAdapter == null)
+ {
+ return null; // If the SessionAdapter isn't associated with the selected ConnectionAdapter give up.
+ }
+ else
+ {
+ return sessionAdapter;
+ }
+ }
+ }
+
private void populateConsumers()
{
Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
@@ -102,7 +138,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
if(!_consumerAdapters.containsKey(subscription))
{
- _consumerAdapters.put(subscription, new ConsumerAdapter(this, subscription));
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
}
}
}
@@ -571,9 +613,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
if(!_consumerAdapters.containsKey(subscription))
{
- adapter = new ConsumerAdapter(this, subscription);
- _consumerAdapters.put(subscription,adapter);
- // TODO - register with session
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
+ _consumerAdapters.put(subscription, adapter);
+ if (sessionAdapter != null)
+ { // Register ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionRegistered(subscription, adapter);
+ }
}
}
if(adapter != null)
@@ -589,10 +635,14 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
synchronized (_consumerAdapters)
{
adapter = _consumerAdapters.remove(subscription);
- // TODO - register with session
}
if(adapter != null)
{
+ SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ if (sessionAdapter != null)
+ { // Unregister ConsumerAdapter with the SessionAdapter.
+ sessionAdapter.subscriptionUnregistered(subscription);
+ }
childRemoved(adapter);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 2fffdb32f8..550e8cecd6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -21,8 +21,10 @@
package org.apache.qpid.server.model.adapter;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -34,6 +36,7 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -44,6 +47,7 @@ final class SessionAdapter extends AbstractAdapter implements Session
private AMQSessionModel _session;
private SessionStatistics _statistics;
+ private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
@@ -54,7 +58,10 @@ final class SessionAdapter extends AbstractAdapter implements Session
public Collection<Consumer> getSubscriptions()
{
- return null; //TODO
+ synchronized (_consumerAdapters)
+ {
+ return new ArrayList<Consumer>(_consumerAdapters.values());
+ }
}
public Collection<Publisher> getPublishers()
@@ -111,6 +118,37 @@ final class SessionAdapter extends AbstractAdapter implements Session
return 0; //TODO
}
+ /**
+ * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
+ * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * @param adapter the registered ConsumerAdapter.
+ */
+ void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+ {
+ synchronized (_consumerAdapters)
+ {
+ _consumerAdapters.put(subscription, adapter);
+ }
+ childAdded(adapter);
+ }
+
+ /**
+ * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
+ * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ */
+ void subscriptionUnregistered(Subscription subscription)
+ {
+ ConsumerAdapter adapter = null;
+ synchronized (_consumerAdapters)
+ {
+ adapter = _consumerAdapters.remove(subscription);
+ }
+ if (adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+
@Override
public Collection<String> getAttributeNames()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 2a66763272..97bb492484 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -214,6 +214,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
+ /**
+ * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost.
+ * @param connection the AMQConnectionModel used to index the ConnectionAdapter.
+ * @return the requested ConnectionAdapter.
+ */
+ ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection)
+ {
+ synchronized (_connectionAdapters)
+ {
+ return _connectionAdapters.get(connection);
+ }
+ }
+
public Collection<Queue> getQueues()
{
synchronized(_queueAdapters)
@@ -644,6 +657,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
if(adapter != null)
{
+ // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any
+ // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters.
+ adapter.getSessions();
+
childRemoved(adapter);
}
}