summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-08 18:56:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-08 18:56:42 +0000
commitb2023145c2b88ee458429663536cbab7ddd8f3b0 (patch)
tree259f7ed1e2372025c7a65338abc3a58ef6b88e74 /qpid/java/broker-plugins
parent19b2671cbd4af77ac52c222605c09b06cab7ced6 (diff)
downloadqpid-python-b2023145c2b88ee458429663536cbab7ddd8f3b0.tar.gz
QPID-5617 : [Java Broker] restore or implement child added/removed notifications for configured objects within the vhost
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java35
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java90
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java124
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java43
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java30
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java41
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java22
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java89
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java12
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java4
22 files changed, 480 insertions, 110 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index eeafb30642..a3fabf076c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
@@ -31,7 +32,6 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -66,7 +66,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
public ConsumerTarget_0_10(ServerSession session,
@@ -90,7 +90,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public Consumer getConsumer()
+ public ConsumerImpl getConsumer()
{
return _consumer;
}
@@ -105,7 +105,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
boolean closed = false;
State state = getState();
- final Consumer consumer = getConsumer();
+ final ConsumerImpl consumer = getConsumer();
if(consumer != null)
{
consumer.getSendLock();
@@ -569,13 +569,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index a3a80415ac..5e899aa635 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -80,6 +81,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+ private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
+ new CopyOnWriteArrayList<SessionModelListener>();
+
private volatile boolean _stopped;
public ServerConnection(final long connectionId, Broker broker)
@@ -383,6 +387,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
public synchronized void registerSession(final Session ssn)
{
super.registerSession(ssn);
+ sessionAdded((ServerSession)ssn);
if(_blocking)
{
((ServerSession)ssn).block();
@@ -392,6 +397,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
@Override
public synchronized void removeSession(final Session ssn)
{
+ sessionRemoved((ServerSession)ssn);
super.removeSession(ssn);
}
@@ -552,6 +558,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.add(listener);
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.remove(listener);
+ }
+
+ private void sessionAdded(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionAdded(session);
+ }
+ }
+
+ private void sessionRemoved(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionRemoved(session);
+ }
+ }
+
+
+ @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index c2eacfe6e8..0bb3008d13 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
@@ -108,6 +114,9 @@ public class ServerSession extends Session
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
+ private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
+ private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+
public static interface MessageDispositionChangeListener
{
@@ -133,6 +142,7 @@ public class ServerSession extends Session
private final AtomicLong _txnCount = new AtomicLong(0);
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
+ private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
@@ -458,6 +468,18 @@ public class ServerSession extends Session
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
+
+ public void register(final ConsumerImpl consumerImpl)
+ {
+ if(consumerImpl instanceof Consumer<?>)
+ {
+ final Consumer<?> consumer = (Consumer<?>) consumerImpl;
+ _consumers.add(consumer);
+ consumer.addChangeListener(_consumerClosedListener);
+ consumerAdded(consumer);
+ }
+ }
+
public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
@@ -949,6 +971,41 @@ public class ServerSession extends Session
}
@Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+
+ return Collections.unmodifiableCollection(_consumers);
+ }
+
+ @Override
+ public void addConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.remove(listener);
+ }
+
+ private void consumerAdded(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerAdded(consumer);
+ }
+ }
+
+ private void consumerRemoved(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerRemoved(consumer);
+ }
+ }
+
+ @Override
public int compareTo(ServerSession o)
{
return getId().compareTo(o.getId());
@@ -966,4 +1023,37 @@ public class ServerSession extends Session
}
}
}
+
+ private class ConsumerClosedListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
+ {
+ if(newState == org.apache.qpid.server.model.State.DELETED)
+ {
+ consumerRemoved((Consumer<?>)object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 2593c66191..040be92ceb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -51,7 +52,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -234,25 +234,25 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession)session).register(destination, target);
try
{
- EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
{
- options.add(Consumer.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
}
if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
if(method.getExclusive())
{
- options.add(Consumer.Option.EXCLUSIVE);
+ options.add(ConsumerImpl.Option.EXCLUSIVE);
}
- Consumer sub =
+ ((ServerSession)session).register(
queue.addConsumer(target,
filterManager,
MessageTransferMessage.class,
destination,
- options);
+ options));
}
catch (AMQQueue.ExistingExclusiveConsumer existing)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 780e7ad199..baf5eceef7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -33,6 +33,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.Filterable;
@@ -66,7 +67,12 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -76,7 +82,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -173,6 +178,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
private Subject _subject;
+ private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+ private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
public AMQChannel(T session, int channelId, final MessageStore messageStore)
@@ -526,7 +534,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
- public Consumer getSubscription(AMQShortString tag)
+ public ConsumerImpl getSubscription(AMQShortString tag)
{
final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
return target == null ? null : target.getConsumer();
@@ -545,7 +553,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
- * @throws AMQException if something goes wrong
+ * @throws org.apache.qpid.AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
@@ -564,7 +572,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
ConsumerTarget_0_8 target;
- EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
@@ -573,19 +581,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
else if(acks)
{
target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
{
target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
if(exclusive)
{
- options.add(Consumer.Option.EXCLUSIVE);
+ options.add(ConsumerImpl.Option.EXCLUSIVE);
}
@@ -615,12 +623,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
});
}
- Consumer sub =
+ ConsumerImpl sub =
source.addConsumer(target,
filterManager,
- AMQMessage.class,
- AMQShortString.toString(tag),
- options);
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
+ if(sub instanceof Consumer<?>)
+ {
+ final Consumer<?> modelConsumer = (Consumer<?>) sub;
+ consumerAdded(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ _consumers.add(modelConsumer);
+ }
}
catch (AccessControlException e)
{
@@ -659,15 +674,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- Consumer sub = target == null ? null : target.getConsumer();
+ ConsumerImpl sub = target == null ? null : target.getConsumer();
if (sub != null)
{
sub.close();
+ if(sub instanceof Consumer<?>)
+ {
+ _consumers.remove(sub);
+ }
return true;
}
else
{
- _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+ _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered.");
}
return false;
}
@@ -735,7 +754,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Consumer sub = me.getValue().getConsumer();
+ ConsumerImpl sub = me.getValue().getConsumer();
if(sub != null)
{
@@ -754,7 +773,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
* delivery tag)
* @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer)
{
if (_logger.isDebugEnabled())
{
@@ -1126,7 +1145,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
for(MessageInstance entry : _resendList)
{
- Consumer sub = entry.getDeliveredConsumer();
+ ConsumerImpl sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
@@ -1199,7 +1218,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1658,4 +1677,71 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
return _tag2SubscriptionTargetMap.size();
}
+
+ @Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+ return Collections.unmodifiableCollection(_consumers);
+ }
+
+ private class ConsumerClosedListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ consumerRemoved((Consumer<?>)object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
+
+ private void consumerAdded(final Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerAdded(consumer);
+ }
+ }
+
+ private void consumerRemoved(final Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerRemoved(consumer);
+ }
+ }
+
+ @Override
+ public void addConsumerListener(ConsumerListener listener)
+ {
+ _consumerListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(ConsumerListener listener)
+ {
+ _consumerListeners.remove(listener);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 2ebcde199b..a86530fe0e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -56,10 +56,13 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
@@ -73,7 +76,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -102,6 +104,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
+ private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
+ new CopyOnWriteArrayList<SessionModelListener>();
@SuppressWarnings("unchecked")
private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
@@ -759,7 +763,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
synchronized (_channelMap)
{
_channelMap.put(channel.getChannelId(), channel);
-
+ sessionAdded(channel);
if(_blocking)
{
channel.block();
@@ -773,6 +777,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
+ private void sessionAdded(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionAdded(session);
+ }
+ }
+
+ private void sessionRemoved(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionRemoved(session);
+ }
+ }
+
public Long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
@@ -844,15 +864,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*/
public void removeChannel(int channelId)
{
+ AMQChannel<AMQProtocolEngine> session;
synchronized (_channelMap)
{
- _channelMap.remove(channelId);
-
+ session = _channelMap.remove(channelId);
if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
{
_cachedChannels[channelId] = null;
}
}
+ sessionRemoved(session);
}
/**
@@ -1509,6 +1530,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getContextKey());
}
+ @Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.add(listener);
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.remove(listener);
+ }
+
public void setDeferFlush(boolean deferFlush)
{
_deferFlush = deferFlush;
@@ -1525,7 +1558,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public void deliverToClient(final Consumer sub, final ServerMessage message,
+ public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
registerMessageDelivered(message.getSize());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index 9f8799f68e..fa26a73f93 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.consumer.Consumer;
public interface ClientDeliveryMethod
{
- void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props,
+ void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
final long deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 2ce8caefc9..3de89a1d70 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -25,17 +25,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -71,7 +70,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -368,18 +367,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- public Consumer getConsumer()
+ public ConsumerImpl getConsumer()
{
return _consumer;
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
}
@@ -428,7 +427,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
boolean closed = false;
State state = getState();
- final Consumer consumer = getConsumer();
+ final ConsumerImpl consumer = getConsumer();
if(consumer != null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 1de1638c2e..7a2fdb05fc 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -49,7 +49,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
{
message.setRedelivered();
- final Consumer consumer = message.getDeliveredConsumer();
+ final ConsumerImpl consumer = message.getDeliveredConsumer();
if (consumer != null)
{
// Consumer exists
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
index 70d7da3432..c13ff17f67 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.consumer.Consumer;
public interface RecordDeliveryMethod
{
- void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag);
+ void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index f620abf30f..76b5cbbbb9 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
@@ -44,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.AccessControlException;
@@ -150,15 +150,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
ConsumerTarget_0_8 target;
- EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
if(acks)
{
@@ -173,7 +173,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
return(getDeliveryMethod.hasDeliveredMessage());
@@ -202,7 +202,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
@Override
- public void deliverToClient(final Consumer sub, final ServerMessage message,
+ public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 8d7de4cd93..e5cfced4e2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -26,13 +26,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -49,7 +49,7 @@ import java.util.Set;
public class AckTest extends QpidTestCase
{
private ConsumerTarget_0_8 _subscriptionTarget;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
private AMQProtocolSession _protocolSession;
@@ -176,8 +176,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -209,8 +209,8 @@ public class AckTest extends QpidTestCase
null,
AMQMessage.class,
DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -232,7 +232,7 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -255,8 +255,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -292,8 +292,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -326,8 +326,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -360,8 +360,8 @@ public class AckTest extends QpidTestCase
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 1;
publishMessages(msgCount);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index f18da87d09..6d3e648369 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol.v0_8;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.consumer.Consumer;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -63,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue;
private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
private boolean _queueDeleted;
@Override
@@ -74,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase
_queue = mock(AMQQueue.class);
when(_queue.getName()).thenReturn(getName());
when(_queue.isDeleted()).thenReturn(_queueDeleted);
- _consumer = mock(Consumer.class);
- when(_consumer.getConsumerNumber()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
+ _consumer = mock(ConsumerImpl.class);
+ when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
long id = 0;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index eaa5b6a7a5..18949bba50 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
@@ -48,7 +49,6 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -244,7 +244,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
@Override
- public void deliverToClient(Consumer sub, ServerMessage message,
+ public void deliverToClient(ConsumerImpl sub, ServerMessage message,
InstanceProperties props, long deliveryTag)
{
_deliveryCount.incrementAndGet();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 0a53a6436a..00c78581e1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -40,6 +40,8 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -51,6 +53,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
@@ -68,6 +71,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final Object _reference = new Object();
private final Subject _subject = new Subject();
+ private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
+ new CopyOnWriteArrayList<SessionModelListener>();
private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
@@ -111,7 +116,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_connectionId = connectionId;
_subject.getPrincipals().add(new ConnectionPrincipal(this));
_subjectCreator = subjectCreator;
- //_vhost.getConnectionRegistry().registerConnection(this);
}
@@ -129,6 +133,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
}
_vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
+ _vhost.getConnectionRegistry().registerConnection(this);
+
if(_vhost == null)
{
final Error err = new Error();
@@ -147,6 +153,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
final Session_1_0 session = new Session_1_0(this, endpoint);
_sessions.add(session);
+ sessionAdded(session);
endpoint.setSessionEventListener(new SessionEventListener()
{
@Override
@@ -182,6 +189,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
void sessionEnded(Session_1_0 session)
{
_sessions.remove(session);
+ sessionRemoved(session);
}
public void removeDeleteTask(final Action<? super Connection_1_0> task)
@@ -428,4 +436,35 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
return _vhost;
}
+
+
+ @Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.add(listener);
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.remove(listener);
+ }
+
+ private void sessionAdded(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionAdded(session);
+ }
+ }
+
+ private void sessionRemoved(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionRemoved(session);
+ }
+ }
+
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index f3417710a5..adb2f8ea6a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -37,13 +37,13 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -60,7 +60,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
public ConsumerTarget_1_0(final SendingLink_1_0 link,
boolean acquires)
@@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
_acquires = acquires;
}
- public Consumer getConsumer()
+ public ConsumerImpl getConsumer()
{
return _consumer;
}
@@ -498,13 +498,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 24395a6fad..eb1f75b771 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -55,7 +56,6 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -69,7 +69,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private VirtualHost _vhost;
private SendingDestination _destination;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
private ConsumerTarget_1_0 _target;
private boolean _draining;
@@ -99,7 +99,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
boolean noLocal = false;
@@ -163,8 +163,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
if(source.getDistributionMode() != StdDistMode.COPY)
{
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
}
@@ -318,8 +318,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_target = new ConsumerTarget_1_0(this, true);
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
@@ -331,7 +331,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
if(noLocal)
{
- options.add(Consumer.Option.NO_LOCAL);
+ options.add(ConsumerImpl.Option.NO_LOCAL);
}
try
@@ -372,7 +372,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
public void resume(SendingLinkAttachment linkAttachment)
{
_linkAttachment = linkAttachment;
-
}
public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
@@ -692,4 +691,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
return _vhost;
}
+
+ public ConsumerImpl getConsumer()
+ {
+ return _consumer;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 411117be4d..e124b4d5ac 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.*;
import org.apache.qpid.protocol.AMQConstant;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -85,6 +87,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private AtomicBoolean _closed = new AtomicBoolean();
private final Subject _subject = new Subject();
+ private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+ private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -184,6 +189,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
);
sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
+ registerConsumer(sendingLink.getConsumer());
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
@@ -383,6 +389,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
}
+ private void registerConsumer(final ConsumerImpl consumer)
+ {
+ if(consumer instanceof Consumer<?>)
+ {
+ Consumer<?> modelConsumer = (Consumer<?>) consumer;
+ _consumers.add(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ consumerAdded(modelConsumer);
+ }
+ }
+
private AMQQueue createTemporaryQueue(Map properties)
{
@@ -653,11 +670,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
@Override
public int getConsumerCount()
{
- // TODO
- return 0;
+ return getConsumers().size();
}
+
public String toLogString()
{
long connectionId = getConnectionModel().getConnectionId();
@@ -785,4 +802,72 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
});
}
}
+
+
+ @Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+ return Collections.unmodifiableCollection(_consumers);
+ }
+
+ @Override
+ public void addConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.remove(listener);
+ }
+
+ private void consumerAdded(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerAdded(consumer);
+ }
+ }
+
+ private void consumerRemoved(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerRemoved(consumer);
+ }
+ }
+
+ private class ConsumerClosedListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
+ {
+ if(newState == org.apache.qpid.server.model.State.DELETED)
+ {
+ consumerRemoved((Consumer<?>)object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index a47506f804..788ce63c8f 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
@@ -949,7 +949,7 @@ class ManagementNode implements MessageSource, MessageDestination
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<Consumer.Option> options)
+ final EnumSet<ConsumerImpl.Option> options)
{
final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
@@ -1054,7 +1054,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean isAcquiredBy(final Consumer consumer)
+ public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
}
@@ -1072,7 +1072,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public Consumer getDeliveredConsumer()
+ public ConsumerImpl getDeliveredConsumer()
{
return null;
}
@@ -1084,7 +1084,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean isRejectedBy(final Consumer consumer)
+ public boolean isRejectedBy(final ConsumerImpl consumer)
{
return false;
}
@@ -1102,7 +1102,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean acquire(final Consumer sub)
+ public boolean acquire(final ConsumerImpl sub)
{
return false;
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 8a1f39fdfe..a3b1f932ac 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -33,9 +33,9 @@ import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-class ManagementNodeConsumer implements Consumer
+class ManagementNodeConsumer implements ConsumerImpl
{
- private final long _id = Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
+ private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 18c68bd198..ae2828d392 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +84,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public boolean isAcquiredBy(final Consumer consumer)
+ public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();
}
@@ -114,7 +114,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public boolean isRejectedBy(final Consumer consumer)
+ public boolean isRejectedBy(final ConsumerImpl consumer)
{
return false;
}
@@ -132,7 +132,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public boolean acquire(final Consumer sub)
+ public boolean acquire(final ConsumerImpl sub)
{
return false;
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
index baf92e8522..0947ae2a89 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -40,7 +41,6 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.consumer.Consumer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -327,7 +327,7 @@ public class MessageServlet extends AbstractServlet
: entry.isAcquired()
? "Acquired"
: "");
- final Consumer deliveredConsumer = entry.getDeliveredConsumer();
+ final ConsumerImpl deliveredConsumer = entry.getDeliveredConsumer();
object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getConsumerNumber());
ServerMessage message = entry.getMessage();