From 1635ca5849b7c765d5d7be9cd01d46b06349f320 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 26 Aug 2014 17:01:07 +0000 Subject: QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/consumer/AbstractConsumerTarget.java | 50 ++++++++++--- .../qpid/server/consumer/ConsumerTarget.java | 13 +++- .../qpid/server/message/MessageInstance.java | 2 + .../apache/qpid/server/queue/QueueConsumer.java | 3 + .../qpid/server/queue/QueueConsumerImpl.java | 42 +++++------ .../apache/qpid/server/queue/QueueEntryImpl.java | 20 ++++++ .../org/apache/qpid/server/queue/QueueRunner.java | 8 +-- .../apache/qpid/server/queue/SubFlushRunner.java | 12 ++-- .../apache/qpid/server/consumer/MockConsumer.java | 32 ++++++++- .../qpid/server/queue/MockMessageInstance.java | 6 ++ .../qpid/server/queue/StandardQueueTest.java | 5 +- .../server/protocol/v0_10/ConsumerTarget_0_10.java | 66 +++++++++-------- .../ExplicitAcceptDispositionChangeListener.java | 15 ++-- .../ImplicitAcceptDispositionChangeListener.java | 15 ++-- .../v0_10/MessageAcceptCompletionListener.java | 11 ++- .../protocol/v0_10/ServerSessionDelegate.java | 84 +++++++++++++++++----- .../qpid/server/protocol/v0_8/AMQChannel.java | 79 +++++++++++--------- .../server/protocol/v0_8/ConsumerTarget_0_8.java | 64 +++++++++-------- .../v0_8/handler/BasicConsumeMethodHandler.java | 54 +++++++++++--- .../qpid/server/protocol/v0_8/AcknowledgeTest.java | 5 +- .../protocol/v0_8/QueueBrowserUsesNoAckTest.java | 3 +- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 8 +-- .../server/management/amqp/ManagementNode.java | 6 ++ .../management/amqp/ManagementNodeConsumer.java | 31 ++++---- .../server/management/amqp/ManagementResponse.java | 6 ++ .../java/org/apache/qpid/client/AMQSession.java | 29 ++++---- .../org/apache/qpid/client/AMQSession_0_10.java | 10 +-- .../org/apache/qpid/client/AMQSession_0_8.java | 12 +++- .../client/messaging/address/AddressHelper.java | 38 ++++++---- .../apache/qpid/client/messaging/address/Link.java | 2 +- .../destination/AddressBasedDestinationTest.java | 37 +++++----- 31 files changed, 516 insertions(+), 252 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index aa721e598a..192164ca6b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -20,16 +20,24 @@ */ package org.apache.qpid.server.consumer; -import org.apache.qpid.server.util.StateChangeListener; - +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget { private final AtomicReference _state; - private final AtomicReference> _stateListener = - new AtomicReference>(); + + private final Set> _stateChangeListeners = new + CopyOnWriteArraySet<>(); + + private final Lock _stateChangeLock = new ReentrantLock(); + protected AbstractConsumerTarget(final State initialState) { @@ -46,8 +54,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget { if(_state.compareAndSet(from, to)) { - StateChangeListener listener = _stateListener.get(); - if(listener != null) + for (StateChangeListener listener : _stateChangeListeners) { listener.stateChanged(this, from, to); } @@ -59,15 +66,38 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } } + public final void notifyCurrentState() + { + for (StateChangeListener listener : _stateChangeListeners) + { + State state = getState(); + listener.stateChanged(this, state, state); + } + } + public final void addStateListener(StateChangeListener listener) + { + _stateChangeListeners.add(listener); + } + + @Override + public void removeStateChangeListener(final StateChangeListener listener) + { + _stateChangeListeners.remove(listener); + } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } - public final void setStateListener(StateChangeListener listener) + public final void getSendLock() { - _stateListener.set(listener); + _stateChangeLock.lock(); } - public final StateChangeListener getStateListener() + public final void releaseSendLock() { - return _stateListener.get(); + _stateChangeLock.unlock(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index b7be1bfd9b..5aef922da5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -31,6 +31,8 @@ public interface ConsumerTarget void acquisitionRemoved(MessageInstance node); + void removeStateChangeListener(StateChangeListener listener); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -42,7 +44,7 @@ public interface ConsumerTarget void consumerRemoved(ConsumerImpl sub); - void setStateListener(StateChangeListener listener); + void addStateListener(StateChangeListener listener); long getUnacknowledgedBytes(); @@ -50,7 +52,7 @@ public interface ConsumerTarget AMQSessionModel getSessionModel(); - long send(MessageInstance entry, boolean batch); + long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); void flushBatched(); @@ -65,4 +67,11 @@ public interface ConsumerTarget boolean isSuspended(); boolean close(); + + boolean trySendLock(); + + void getSendLock(); + + void releaseSendLock(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 1bf451948d..d3518f428b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -79,6 +79,8 @@ public interface MessageInstance Filterable asFilterable(); + ConsumerImpl getAcquiringConsumer(); + public static enum State { AVAILABLE, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 71b7636159..02cd7ff56f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.model.Consumer; @@ -52,4 +53,6 @@ public interface QueueConsumer> extends ConsumerImpl, MessageInstance.ConsumerAcquiredState getOwningState(); QueueContext getQueueContext(); + + ConsumerTarget getTarget(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 4044c938db..60bad7bf1c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -65,7 +63,6 @@ class QueueConsumerImpl private final AtomicBoolean _targetClosed = new AtomicBoolean(false); private final AtomicBoolean _closed = new AtomicBoolean(false); private final long _consumerNumber; - private final Lock _stateChangeLock = new ReentrantLock(); private final long _createTime = System.currentTimeMillis(); private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this); private final boolean _acquires; @@ -90,6 +87,8 @@ class QueueConsumerImpl private final ConsumerTarget _target; private final SubFlushRunner _runner = new SubFlushRunner(this); + private final StateChangeListener + _listener; private volatile QueueContext _queueContext; private StateChangeListener _stateListener = new StateChangeListener() { @@ -134,17 +133,17 @@ class QueueConsumerImpl setupLogging(); - _target.setStateListener( - new StateChangeListener() - { - @Override - public void stateChanged(final ConsumerTarget object, - final ConsumerTarget.State oldState, - final ConsumerTarget.State newState) - { - targetStateChanged(oldState, newState); - } - }); + _listener = new StateChangeListener() + { + @Override + public void stateChanged(final ConsumerTarget object, + final ConsumerTarget.State oldState, + final ConsumerTarget.State newState) + { + targetStateChanged(oldState, newState); + } + }; + _target.addStateListener(_listener); } private static Map createAttributeMap(String name, FilterManager filters, EnumSet