diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-08 18:56:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-08 18:56:42 +0000 |
| commit | b2023145c2b88ee458429663536cbab7ddd8f3b0 (patch) | |
| tree | 259f7ed1e2372025c7a65338abc3a58ef6b88e74 /qpid/java/broker-plugins | |
| parent | 19b2671cbd4af77ac52c222605c09b06cab7ced6 (diff) | |
| download | qpid-python-b2023145c2b88ee458429663536cbab7ddd8f3b0.tar.gz | |
QPID-5617 : [Java Broker] restore or implement child added/removed notifications for configured objects within the vhost
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
22 files changed, 480 insertions, 110 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index eeafb30642..a3fabf076c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.EventLogger; @@ -31,7 +32,6 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -66,7 +66,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final Map<String, Object> _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_0_10(ServerSession session, @@ -90,7 +90,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -105,7 +105,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { consumer.getSendLock(); @@ -569,13 +569,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index a3a80415ac..5e899aa635 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -80,6 +81,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = + new CopyOnWriteArrayList<SessionModelListener>(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -383,6 +387,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S public synchronized void registerSession(final Session ssn) { super.registerSession(ssn); + sessionAdded((ServerSession)ssn); if(_blocking) { ((ServerSession)ssn).block(); @@ -392,6 +397,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public synchronized void removeSession(final Session ssn) { + sessionRemoved((ServerSession)ssn); super.removeSession(ssn); } @@ -552,6 +558,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + + private void sessionAdded(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + + @Override public String getClientVersion() { return getConnectionDelegate().getClientVersion(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index c2eacfe6e8..0bb3008d13 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; @@ -108,6 +114,9 @@ public class ServerSession extends Session private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); + private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + public static interface MessageDispositionChangeListener { @@ -133,6 +142,7 @@ public class ServerSession extends Session private final AtomicLong _txnCount = new AtomicLong(0); private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); + private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>(); @@ -458,6 +468,18 @@ public class ServerSession extends Session _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } + + public void register(final ConsumerImpl consumerImpl) + { + if(consumerImpl instanceof Consumer<?>) + { + final Consumer<?> consumer = (Consumer<?>) consumerImpl; + _consumers.add(consumer); + consumer.addChangeListener(_consumerClosedListener); + consumerAdded(consumer); + } + } + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); @@ -949,6 +971,41 @@ public class ServerSession extends Session } @Override + public Collection<Consumer<?>> getConsumers() + { + + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + @Override public int compareTo(ServerSession o) { return getId().compareTo(o.getId()); @@ -966,4 +1023,37 @@ public class ServerSession extends Session } } } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer<?>)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 2593c66191..040be92ceb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -51,7 +52,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -234,25 +234,25 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).register(destination, target); try { - EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) { - options.add(Consumer.Option.ACQUIRES); + options.add(ConsumerImpl.Option.ACQUIRES); } if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(method.getExclusive()) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } - Consumer sub = + ((ServerSession)session).register( queue.addConsumer(target, filterManager, MessageTransferMessage.class, destination, - options); + options)); } catch (AMQQueue.ExistingExclusiveConsumer existing) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 780e7ad199..baf5eceef7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.Filterable; @@ -66,7 +67,12 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.CapacityChecker; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -76,7 +82,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -173,6 +178,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); private Subject _subject; + private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); public AMQChannel(T session, int channelId, final MessageStore messageStore) @@ -526,7 +534,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } - public Consumer getSubscription(AMQShortString tag) + public ConsumerImpl getSubscription(AMQShortString tag) { final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); return target == null ? null : target.getConsumer(); @@ -545,7 +553,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * - * @throws AMQException if something goes wrong + * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) @@ -564,7 +572,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } ConsumerTarget_0_8 target; - EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { @@ -573,19 +581,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else if(acks) { target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(exclusive) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } @@ -615,12 +623,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } }); } - Consumer sub = + ConsumerImpl sub = source.addConsumer(target, filterManager, - AMQMessage.class, - AMQShortString.toString(tag), - options); + AMQMessage.class, + AMQShortString.toString(tag), + options); + if(sub instanceof Consumer<?>) + { + final Consumer<?> modelConsumer = (Consumer<?>) sub; + consumerAdded(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + _consumers.add(modelConsumer); + } } catch (AccessControlException e) { @@ -659,15 +674,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - Consumer sub = target == null ? null : target.getConsumer(); + ConsumerImpl sub = target == null ? null : target.getConsumer(); if (sub != null) { sub.close(); + if(sub instanceof Consumer<?>) + { + _consumers.remove(sub); + } return true; } else { - _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."); } return false; } @@ -735,7 +754,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Consumer sub = me.getValue().getConsumer(); + ConsumerImpl sub = me.getValue().getConsumer(); if(sub != null) { @@ -754,7 +773,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * delivery tag) * @param consumer The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) { if (_logger.isDebugEnabled()) { @@ -1126,7 +1145,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> for(MessageInstance entry : _resendList) { - Consumer sub = entry.getDeliveredConsumer(); + ConsumerImpl sub = entry.getDeliveredConsumer(); if(sub == null || sub.isClosed()) { entry.release(); @@ -1199,7 +1218,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1658,4 +1677,71 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { return _tag2SubscriptionTargetMap.size(); } + + @Override + public Collection<Consumer<?>> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + consumerRemoved((Consumer<?>)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } + + private void consumerAdded(final Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(final Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + @Override + public void addConsumerListener(ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(ConsumerListener listener) + { + _consumerListeners.remove(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 2ebcde199b..a86530fe0e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -56,10 +56,13 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; @@ -73,7 +76,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -102,6 +104,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = + new CopyOnWriteArrayList<SessionModelListener>(); @SuppressWarnings("unchecked") private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; @@ -759,7 +763,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi synchronized (_channelMap) { _channelMap.put(channel.getChannelId(), channel); - + sessionAdded(channel); if(_blocking) { channel.block(); @@ -773,6 +777,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } + private void sessionAdded(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + public Long getMaximumNumberOfChannels() { return _maxNoOfChannels; @@ -844,15 +864,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ public void removeChannel(int channelId) { + AMQChannel<AMQProtocolEngine> session; synchronized (_channelMap) { - _channelMap.remove(channelId); - + session = _channelMap.remove(channelId); if ((channelId & CHANNEL_CACHE_SIZE) == channelId) { _cachedChannels[channelId] = null; } } + sessionRemoved(session); } /** @@ -1509,6 +1530,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + public void setDeferFlush(boolean deferFlush) { _deferFlush = deferFlush; @@ -1525,7 +1558,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, + public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { registerMessageDelivered(message.getSize()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index 9f8799f68e..fa26a73f93 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; public interface ClientDeliveryMethod { - void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, + void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 2ce8caefc9..3de89a1d70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -25,17 +25,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; @@ -71,7 +70,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private Consumer _consumer; + private ConsumerImpl _consumer; public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -368,18 +367,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @@ -428,7 +427,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 1de1638c2e..7a2fdb05fc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import java.util.Map; @@ -49,7 +49,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { message.setRedelivered(); - final Consumer consumer = message.getDeliveredConsumer(); + final ConsumerImpl consumer = message.getDeliveredConsumer(); if (consumer != null) { // Consumer exists diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java index 70d7da3432..c13ff17f67 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; public interface RecordDeliveryMethod { - void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag); + void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index f620abf30f..76b5cbbbb9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; @@ -44,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.AccessControlException; @@ -150,15 +150,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } }; ConsumerTarget_0_8 target; - EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES); + EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); if(acks) { @@ -173,7 +173,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB singleMessageCredit, getDeliveryMethod, getRecordMethod); } - Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); sub.flush(); sub.close(); return(getDeliveryMethod.hasDeliveredMessage()); @@ -202,7 +202,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, + public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { _singleMessageCredit.useCreditForMessage(message.getSize()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 8d7de4cd93..e5cfced4e2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -26,13 +26,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -49,7 +49,7 @@ import java.util.Set; public class AckTest extends QpidTestCase { private ConsumerTarget_0_8 _subscriptionTarget; - private Consumer _consumer; + private ConsumerImpl _consumer; private AMQProtocolSession _protocolSession; @@ -176,8 +176,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -209,8 +209,8 @@ public class AckTest extends QpidTestCase null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -232,7 +232,7 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); @@ -255,8 +255,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -292,8 +292,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -326,8 +326,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -360,8 +360,8 @@ public class AckTest extends QpidTestCase _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 1; publishMessages(msgCount); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index f18da87d09..6d3e648369 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol.v0_8; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.consumer.Consumer; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -63,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue; private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>(); - private Consumer _consumer; + private ConsumerImpl _consumer; private boolean _queueDeleted; @Override @@ -74,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase _queue = mock(AMQQueue.class); when(_queue.getName()).thenReturn(getName()); when(_queue.isDeleted()).thenReturn(_queueDeleted); - _consumer = mock(Consumer.class); - when(_consumer.getConsumerNumber()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); + _consumer = mock(ConsumerImpl.class); + when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); long id = 0; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index eaa5b6a7a5..18949bba50 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; @@ -48,7 +49,6 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -244,7 +244,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(Consumer sub, ServerMessage message, + public void deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) { _deliveryCount.incrementAndGet(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 0a53a6436a..00c78581e1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -40,6 +40,8 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -51,6 +53,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; @@ -68,6 +71,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final Object _reference = new Object(); private final Subject _subject = new Subject(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = + new CopyOnWriteArrayList<SessionModelListener>(); private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); @@ -111,7 +116,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _connectionId = connectionId; _subject.getPrincipals().add(new ConnectionPrincipal(this)); _subjectCreator = subjectCreator; - //_vhost.getConnectionRegistry().registerConnection(this); } @@ -129,6 +133,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST); } _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host); + _vhost.getConnectionRegistry().registerConnection(this); + if(_vhost == null) { final Error err = new Error(); @@ -147,6 +153,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { final Session_1_0 session = new Session_1_0(this, endpoint); _sessions.add(session); + sessionAdded(session); endpoint.setSessionEventListener(new SessionEventListener() { @Override @@ -182,6 +189,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod void sessionEnded(Session_1_0 session) { _sessions.remove(session); + sessionRemoved(session); } public void removeDeleteTask(final Action<? super Connection_1_0> task) @@ -428,4 +436,35 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _vhost; } + + + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + + private void sessionAdded(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f3417710a5..adb2f8ea6a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -37,13 +37,13 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -60,7 +60,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget private Binary _transactionId; private final AMQPDescribedTypeRegistry _typeRegistry; private final SectionEncoder _sectionEncoder; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_1_0(final SendingLink_1_0 link, boolean acquires) @@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget _acquires = acquires; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -498,13 +498,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 24395a6fad..eb1f75b771 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -55,7 +56,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -69,7 +69,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private VirtualHost _vhost; private SendingDestination _destination; - private Consumer _consumer; + private ConsumerImpl _consumer; private ConsumerTarget_1_0 _target; private boolean _draining; @@ -99,7 +99,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS linkAttachment.setDeliveryStateHandler(this); QueueDestination qd = null; - EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); boolean noLocal = false; @@ -163,8 +163,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); if(source.getDistributionMode() != StdDistMode.COPY) { - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } } @@ -318,8 +318,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, true); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else @@ -331,7 +331,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { if(noLocal) { - options.add(Consumer.Option.NO_LOCAL); + options.add(ConsumerImpl.Option.NO_LOCAL); } try @@ -372,7 +372,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public void resume(SendingLinkAttachment linkAttachment) { _linkAttachment = linkAttachment; - } public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) @@ -692,4 +691,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _vhost; } + + public ConsumerImpl getConsumer() + { + return _consumer; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 411117be4d..e124b4d5ac 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; import org.apache.qpid.protocol.AMQConstant; @@ -50,6 +51,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -85,6 +87,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private AtomicBoolean _closed = new AtomicBoolean(); private final Subject _subject = new Subject(); + private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -184,6 +189,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); + registerConsumer(sendingLink.getConsumer()); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) @@ -383,6 +389,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } + private void registerConsumer(final ConsumerImpl consumer) + { + if(consumer instanceof Consumer<?>) + { + Consumer<?> modelConsumer = (Consumer<?>) consumer; + _consumers.add(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + consumerAdded(modelConsumer); + } + } + private AMQQueue createTemporaryQueue(Map properties) { @@ -653,11 +670,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio @Override public int getConsumerCount() { - // TODO - return 0; + return getConsumers().size(); } + public String toLogString() { long connectionId = getConnectionModel().getConnectionId(); @@ -785,4 +802,72 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio }); } } + + + @Override + public Collection<Consumer<?>> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer<?>)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index a47506f804..788ce63c8f 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; @@ -949,7 +949,7 @@ class ManagementNode implements MessageSource, MessageDestination final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - final EnumSet<Consumer.Option> options) + final EnumSet<ConsumerImpl.Option> options) { final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); @@ -1054,7 +1054,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -1072,7 +1072,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -1084,7 +1084,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -1102,7 +1102,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 8a1f39fdfe..a3b1f932ac 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; @@ -33,9 +33,9 @@ import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -class ManagementNodeConsumer implements Consumer +class ManagementNodeConsumer implements ConsumerImpl { - private final long _id = Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); + private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); private final ManagementNode _managementNode; private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>()); private final ConsumerTarget _target; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 18c68bd198..ae2828d392 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +84,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); } @@ -114,7 +114,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -132,7 +132,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index baf92e8522..0947ae2a89 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -40,7 +41,6 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.consumer.Consumer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -327,7 +327,7 @@ public class MessageServlet extends AbstractServlet : entry.isAcquired() ? "Acquired" : ""); - final Consumer deliveredConsumer = entry.getDeliveredConsumer(); + final ConsumerImpl deliveredConsumer = entry.getDeliveredConsumer(); object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getConsumerNumber()); ServerMessage message = entry.getMessage(); |
