summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-21 01:15:30 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-21 01:15:30 +0000
commit7e6f4149a73c4347475caa362f50e4e97d697e2d (patch)
treea594a4ba22e59090ce699900f5a78d0c39eaac3a
parent344ca0282a94ff5dc364a25186593249bbd478d8 (diff)
downloadqpid-python-7e6f4149a73c4347475caa362f50e4e97d697e2d.tar.gz
QPID-5567 : Move acl checks into the objects being created
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570411 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java436
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java458
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java120
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java1
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java4
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java54
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java644
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java30
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java64
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java4
32 files changed, 723 insertions, 1247 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
index 469a4bb9d0..1e3bafa39e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
@@ -48,6 +48,10 @@ public class Binding
_queue = queue;
_exchange = exchange;
_arguments = arguments == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(arguments);
+
+ //Perform ACLs
+ queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+
}
public UUID getId()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
index e23eb397e1..410a0ba2af 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
public interface Consumer
@@ -48,7 +50,7 @@ public interface Consumer
AMQSessionModel getSessionModel();
- void setNoLocal(boolean noLocal);
+ MessageSource getMessageSource();
long getId();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index cb5902d234..a6aad93b27 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -119,6 +119,9 @@ public abstract class AbstractExchange implements Exchange
_id = id;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
+ // check ACL
+ host.getSecurityManager().authoriseCreateExchange(this);
+
// Log Exchange creation
CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable));
}
@@ -624,9 +627,6 @@ public abstract class AbstractExchange implements Exchange
arguments = Collections.emptyMap();
}
- //Perform ACLs
- _virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, bindingKey);
-
if (id == null)
{
id = UUIDGenerator.generateBindingUUID(getName(),
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 21586c6a4a..80aa4fa49c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -116,8 +116,6 @@ public class DefaultExchangeFactory implements ExchangeFactory
public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
throws AMQUnknownExchangeType
{
- // Check access
- _host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type);
ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
if (exchType == null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index 20e7edda92..28d926686f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 45dc556732..f7654d63fa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -20,441 +20,41 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.util.StateChangeListener;
-
-import java.text.MessageFormat;
-import java.util.EnumMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
+import org.apache.qpid.server.message.MessageInstance;
-class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer
+interface QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends Consumer
{
- public static enum State
- {
- ACTIVE,
- SUSPENDED,
- CLOSED
- }
-
- private static final Logger _logger = Logger.getLogger(QueueConsumer.class);
- private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
- private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final long _id;
- private final Lock _stateChangeLock = new ReentrantLock();
- private final long _createTime = System.currentTimeMillis();
- private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
- private final boolean _acquires;
- private final boolean _seesRequeues;
- private final String _consumerName;
- private final boolean _isTransient;
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
- private final FilterManager _filters;
- private final Class<? extends ServerMessage> _messageClass;
- private final Object _sessionReference;
- private Q _queue;
- private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
- + "(UNKNOWN)"
- + "] ");
-
- static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
- new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
-
- static
- {
- STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
- STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED);
- STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED);
- }
-
- private final T _target;
- private final SubFlushRunner _runner = new SubFlushRunner(this);
- private volatile QueueContext<E,Q,L> _queueContext;
- private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>()
- {
- public void stateChanged(QueueConsumer sub, State oldState, State newState)
- {
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
- }
- };
- private boolean _noLocal;
-
- QueueConsumer(final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final boolean acquires,
- final boolean seesRequeues,
- final String consumerName,
- final boolean isTransient,
- T target)
- {
- _messageClass = messageClass;
- _sessionReference = target.getSessionModel().getConnectionReference();
- _id = SUB_ID_GENERATOR.getAndIncrement();
- _filters = filters;
- _acquires = acquires;
- _seesRequeues = seesRequeues;
- _consumerName = consumerName;
- _isTransient = isTransient;
- _target = target;
- _target.setStateListener(
- new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- targetStateChanged(oldState, newState);
- }
- });
- }
-
- private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
- {
- if(oldState != newState)
- {
- if(newState == ConsumerTarget.State.CLOSED)
- {
- if(_targetClosed.compareAndSet(false,true))
- {
- CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- }
- else
- {
- CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString()));
- }
- }
-
- if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
- {
- close();
- }
- final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener();
- if(stateListener != null)
- {
- stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
- }
- }
-
- public T getTarget()
- {
- return _target;
- }
-
- @Override
- public void externalStateChange()
- {
- getQueue().deliverAsync(this);
- }
-
- @Override
- public long getUnacknowledgedBytes()
- {
- return _target.getUnacknowledgedBytes();
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- return _target.getUnacknowledgedMessages();
- }
-
- @Override
- public AMQSessionModel getSessionModel()
- {
- return _target.getSessionModel();
- }
-
- @Override
- public boolean isSuspended()
- {
- return _target.isSuspended();
- }
-
- @Override
- public void close()
- {
- if(_closed.compareAndSet(false,true))
- {
- getSendLock();
- try
- {
- _target.close();
- _target.consumerRemoved(this);
- _queue.unregisterConsumer(this);
- }
- finally
- {
- releaseSendLock();
- }
-
- }
- }
-
- void flushBatched()
- {
- _target.flushBatched();
- }
-
- void queueDeleted()
- {
- _target.queueDeleted();
- }
-
- boolean wouldSuspend(final MessageInstance msg)
- {
- return !_target.allocateCredit(msg.getMessage());
- }
-
- void restoreCredit(final MessageInstance queueEntry)
- {
- _target.restoreCredit(queueEntry.getMessage());
- }
-
- void queueEmpty()
- {
- _target.queueEmpty();
- }
-
- State getState()
- {
- return STATE_MAP.get(_target.getState());
- }
-
- public final Q getQueue()
- {
- return _queue;
- }
-
- final void setQueue(Q queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for consumer " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- String queueString = new QueueLogSubject(_queue).toLogString();
-
- _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
- + "("
- // queueString is [vh(/{0})/qu({1}) ] so need to trim
- // ^ ^^
- + queueString.substring(1,queueString.length() - 3)
- + ")"
- + "] ");
-
-
- if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- final String filterLogString = getFilterLogString();
- CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
- filterLogString.length() > 0));
- }
- }
-
- protected final LogSubject getLogSubject()
- {
- return _logActor.getLogSubject();
- }
-
- final LogActor getLogActor()
- {
- return _logActor;
- }
-
-
- @Override
- public final void flush()
- {
- getQueue().flushConsumer(this);
- }
-
- boolean resend(final E entry)
- {
- return getQueue().resend(entry, this);
- }
-
- final SubFlushRunner getRunner()
- {
- return _runner;
- }
-
- public final long getId()
- {
- return _id;
- }
-
- public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener()
- {
- return _stateListener;
- }
+ void flushBatched();
- public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener)
- {
- _stateListener = listener;
- }
+ void queueEmpty();
- final QueueContext<E,Q,L> getQueueContext()
- {
- return _queueContext;
- }
+ boolean hasInterest(E node);
- final void setQueueContext(QueueContext<E,Q,L> queueContext)
- {
- _queueContext = queueContext;
- }
+ boolean wouldSuspend(E entry);
- public final boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
+ void restoreCredit(E entry);
- public final boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
+ void send(E entry, boolean batch);
- public final void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
+ void queueDeleted();
- public final boolean hasInterest(E entry)
- {
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
- {
+ SubFlushRunner getRunner();
- return false;
- }
-
- if (entry.getMessage().getClass() == _messageClass)
- {
- if(_noLocal)
- {
- Object connectionRef = entry.getMessage().getConnectionReference();
- if (connectionRef != null && connectionRef == _sessionReference)
- {
- return false;
- }
- }
- }
- else
- {
- // no interest in messages we can't convert
- if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(),
- _messageClass)==null)
- {
- return false;
- }
- }
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- protected String getFilterLogString()
- {
- StringBuilder filterLogString = new StringBuilder();
- String delimiter = ", ";
- boolean hasEntries = false;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString.append(_filters.toString());
- hasEntries = true;
- }
+ Q getQueue();
- if (!acquires())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Browser");
- }
+ boolean resend(E e);
- return filterLogString.toString();
- }
-
- public final boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
- public final void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public final void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public final long getCreateTime()
- {
- return _createTime;
- }
-
- final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
- {
- return _owningState;
- }
-
- public final boolean acquires()
- {
- return _acquires;
- }
-
- public final boolean seesRequeues()
- {
- return _seesRequeues;
- }
-
- public final String getName()
- {
- return _consumerName;
- }
-
- public final boolean isTransient()
- {
- return _isTransient;
- }
-
- public final long getBytesOut()
+ public static enum State
{
- return _deliveredBytes.longValue();
+ ACTIVE,
+ SUSPENDED,
+ CLOSED
}
- public final long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
+ MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState();
- final void send(final E entry, final boolean batch)
- {
- _deliveredCount.incrementAndGet();
- ServerMessage message = entry.getMessage();
- _deliveredBytes.addAndGet(message.getSize());
- _target.send(entry, batch);
- }
+ QueueContext<E,Q,L> getQueueContext();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
new file mode 100644
index 0000000000..eeeab76656
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -0,0 +1,458 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.text.MessageFormat;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
+
+class QueueConsumerImpl<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueConsumer<T,E,Q,L>
+{
+
+
+ private static final Logger _logger = Logger.getLogger(QueueConsumerImpl.class);
+ private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final long _id;
+ private final Lock _stateChangeLock = new ReentrantLock();
+ private final long _createTime = System.currentTimeMillis();
+ private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
+ private final boolean _acquires;
+ private final boolean _seesRequeues;
+ private final String _consumerName;
+ private final boolean _isTransient;
+ private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+ private final FilterManager _filters;
+ private final Class<? extends ServerMessage> _messageClass;
+ private final Object _sessionReference;
+ private final Q _queue;
+ private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ + "(UNKNOWN)"
+ + "] ");
+
+ static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
+ new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
+
+ static
+ {
+ STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
+ STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED);
+ STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED);
+ }
+
+ private final T _target;
+ private final SubFlushRunner _runner = new SubFlushRunner(this);
+ private volatile QueueContext<E,Q,L> _queueContext;
+ private StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumerImpl<T,E,Q,L>, State>()
+ {
+ public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
+ {
+ CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ }
+ };
+ private final boolean _noLocal;
+
+ QueueConsumerImpl(final Q queue,
+ T target, final String consumerName,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ EnumSet<Option> optionSet)
+ {
+
+ _messageClass = messageClass;
+ _sessionReference = target.getSessionModel().getConnectionReference();
+ _id = SUB_ID_GENERATOR.getAndIncrement();
+ _filters = filters;
+ _acquires = optionSet.contains(Option.ACQUIRES);
+ _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
+ _consumerName = consumerName;
+ _isTransient = optionSet.contains(Option.TRANSIENT);
+ _target = target;
+ _queue = queue;
+ _noLocal = optionSet.contains(Option.NO_LOCAL);
+ setupLogging(optionSet.contains(Option.EXCLUSIVE));
+
+ // Access control
+ _queue.getVirtualHost().getSecurityManager().authoriseCreateConsumer(this);
+
+
+ _target.setStateListener(
+ new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ targetStateChanged(oldState, newState);
+ }
+ });
+ }
+
+ private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
+ {
+ if(oldState != newState)
+ {
+ if(newState == ConsumerTarget.State.CLOSED)
+ {
+ if(_targetClosed.compareAndSet(false,true))
+ {
+ CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
+ }
+ }
+ else
+ {
+ CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString()));
+ }
+ }
+
+ if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
+ {
+ close();
+ }
+ final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> stateListener = getStateListener();
+ if(stateListener != null)
+ {
+ stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
+ }
+ }
+
+ public T getTarget()
+ {
+ return _target;
+ }
+
+ @Override
+ public void externalStateChange()
+ {
+ getQueue().deliverAsync(this);
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ return _target.getUnacknowledgedBytes();
+ }
+
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ return _target.getUnacknowledgedMessages();
+ }
+
+ @Override
+ public AMQSessionModel getSessionModel()
+ {
+ return _target.getSessionModel();
+ }
+
+ @Override
+ public MessageSource getMessageSource()
+ {
+ return _queue;
+ }
+
+ @Override
+ public boolean isSuspended()
+ {
+ return _target.isSuspended();
+ }
+
+ @Override
+ public void close()
+ {
+ if(_closed.compareAndSet(false,true))
+ {
+ getSendLock();
+ try
+ {
+ _target.close();
+ _target.consumerRemoved(this);
+ _queue.unregisterConsumer(this);
+ }
+ finally
+ {
+ releaseSendLock();
+ }
+
+ }
+ }
+
+ public void flushBatched()
+ {
+ _target.flushBatched();
+ }
+
+ public void queueDeleted()
+ {
+ _target.queueDeleted();
+ }
+
+ public boolean wouldSuspend(final E msg)
+ {
+ return !_target.allocateCredit(msg.getMessage());
+ }
+
+ public void restoreCredit(final E queueEntry)
+ {
+ _target.restoreCredit(queueEntry.getMessage());
+ }
+
+ public void queueEmpty()
+ {
+ _target.queueEmpty();
+ }
+
+ State getState()
+ {
+ return STATE_MAP.get(_target.getState());
+ }
+
+ public final Q getQueue()
+ {
+ return _queue;
+ }
+
+ private void setupLogging(final boolean exclusive)
+ {
+ String queueString = new QueueLogSubject(_queue).toLogString();
+
+ _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ + "("
+ // queueString is [vh(/{0})/qu({1}) ] so need to trim
+ // ^ ^^
+ + queueString.substring(1,queueString.length() - 3)
+ + ")"
+ + "] ");
+
+
+ if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY))
+ {
+ final String filterLogString = getFilterLogString();
+ CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && exclusive,
+ filterLogString.length() > 0));
+ }
+ }
+
+ protected final LogSubject getLogSubject()
+ {
+ return _logActor.getLogSubject();
+ }
+
+ final LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
+
+ @Override
+ public final void flush()
+ {
+ getQueue().flushConsumer(this);
+ }
+
+ public boolean resend(final E entry)
+ {
+ return getQueue().resend(entry, this);
+ }
+
+ public final SubFlushRunner getRunner()
+ {
+ return _runner;
+ }
+
+ public final long getId()
+ {
+ return _id;
+ }
+
+ public final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> getStateListener()
+ {
+ return _stateListener;
+ }
+
+ public final void setStateListener(StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> listener)
+ {
+ _stateListener = listener;
+ }
+
+ public final QueueContext<E,Q,L> getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ final void setQueueContext(QueueContext<E,Q,L> queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+ public final boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public final boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+ public final boolean hasInterest(E entry)
+ {
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(this))
+ {
+
+ return false;
+ }
+
+ if (entry.getMessage().getClass() == _messageClass)
+ {
+ if(_noLocal)
+ {
+ Object connectionRef = entry.getMessage().getConnectionReference();
+ if (connectionRef != null && connectionRef == _sessionReference)
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ // no interest in messages we can't convert
+ if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(),
+ _messageClass)==null)
+ {
+ return false;
+ }
+ }
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
+ }
+
+ protected String getFilterLogString()
+ {
+ StringBuilder filterLogString = new StringBuilder();
+ String delimiter = ", ";
+ boolean hasEntries = false;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString.append(_filters.toString());
+ hasEntries = true;
+ }
+
+ if (!acquires())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Browser");
+ }
+
+ return filterLogString.toString();
+ }
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public final void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public final void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public final long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
+ {
+ return _owningState;
+ }
+
+ public final boolean acquires()
+ {
+ return _acquires;
+ }
+
+ public final boolean seesRequeues()
+ {
+ return _seesRequeues;
+ }
+
+ public final String getName()
+ {
+ return _consumerName;
+ }
+
+ public final boolean isTransient()
+ {
+ return _isTransient;
+ }
+
+ public final long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public final long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+ public final void send(final E entry, final boolean batch)
+ {
+ _deliveredCount.incrementAndGet();
+ ServerMessage message = entry.getMessage();
+ _deliveredBytes.addAndGet(message.getSize());
+ _target.send(entry, batch);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 6cbbbf0cb2..45660dec37 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,7 +18,6 @@
*/
package org.apache.qpid.server.queue;
-import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.util.*;
@@ -192,8 +191,19 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
Map<String, Object> attributes,
QueueEntryListFactory<E, Q, L> entryListFactory)
{
+ if (virtualHost == null)
+ {
+ throw new IllegalArgumentException("Virtual Host must not be null");
+ }
+
UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+
+ if (name == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false);
@@ -206,6 +216,30 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
attributes,
LifetimePolicy.PERMANENT);
+
+ _name = name;
+ _durable = durable;
+ _virtualHost = virtualHost;
+ _entries = entryListFactory.createQueueEntryList((Q) this);
+ final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+
+ arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
+ arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
+
+ _arguments = Collections.synchronizedMap(arguments);
+ _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
+
+ _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
+
+
+ _id = id;
+ _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+ _logSubject = new QueueLogSubject(this);
+ _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
+
+ virtualHost.getSecurityManager().authoriseCreateQueue(this);
+
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
AMQSessionModel<?,?> sessionModel;
@@ -294,36 +328,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
}
}
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
-
- if (virtualHost == null)
- {
- throw new IllegalArgumentException("Virtual Host must not be null");
- }
-
- _name = name;
- _durable = durable;
- _virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList((Q) this);
- final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
- arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
- arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
-
- _arguments = Collections.synchronizedMap(arguments);
- _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
-
- _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
-
-
- _id = id;
- _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
-
- _logSubject = new QueueLogSubject(this);
- _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
@@ -593,40 +598,38 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
ConsumerAccessRefused
{
- // Access control
- getVirtualHost().getSecurityManager().authoriseConsume(this);
-
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
}
+ Object exclusiveOwner = _exclusiveOwner;
switch(_exclusivityPolicy)
{
case CONNECTION:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel().getConnectionModel();
+ exclusiveOwner = target.getSessionModel().getConnectionModel();
addExclusivityConstraint(target.getSessionModel().getConnectionModel());
}
else
{
- if(_exclusiveOwner != target.getSessionModel().getConnectionModel())
+ if(exclusiveOwner != target.getSessionModel().getConnectionModel())
{
throw new ConsumerAccessRefused();
}
}
break;
case SESSION:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel();
+ exclusiveOwner = target.getSessionModel();
addExclusivityConstraint(target.getSessionModel());
}
else
{
- if(_exclusiveOwner != target.getSessionModel())
+ if(exclusiveOwner != target.getSessionModel())
{
throw new ConsumerAccessRefused();
}
@@ -639,26 +642,26 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
}
break;
case PRINCIPAL:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
}
else
{
- if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+ if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
{
throw new ConsumerAccessRefused();
}
}
break;
case CONTAINER:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+ exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
}
else
{
- if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+ if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
{
throw new ConsumerAccessRefused();
}
@@ -673,15 +676,24 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
+ if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL))
+ {
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.add(Consumer.Option.NO_LOCAL);
+ }
+
if(exclusive && getConsumerCount() != 0)
{
throw new ExistingConsumerPreventsExclusive();
}
- QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
- optionSet.contains(Consumer.Option.ACQUIRES),
- optionSet.contains(Consumer.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ QueueConsumerImpl<T,E,Q,L> consumer = new QueueConsumerImpl<T,E,Q,L>((Q)this,
+ target,
+ consumerName,
+ filters, messageClass,
+ optionSet);
+
+ _exclusiveOwner = exclusiveOwner;
target.consumerAdded(consumer);
@@ -700,12 +712,6 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
if (!isDeleted())
{
- consumer.setQueue((Q)this, exclusive);
- if(_noLocal)
- {
- consumer.setNoLocal(true);
- }
-
synchronized (_consumerListeners)
{
for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
@@ -732,7 +738,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
}
- synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer)
+ synchronized void unregisterConsumer(final QueueConsumerImpl<?,E,Q,L> consumer)
{
if (consumer == null)
{
@@ -1021,7 +1027,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
{
return true;
}
- QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
+ QueueConsumer<?,E,Q,L> assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index c35c41b4f7..cba992f347 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 2100e3c5cd..3026f33604 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -36,7 +36,7 @@ class SubFlushRunner implements Runnable
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
- private final QueueConsumer _sub;
+ private final QueueConsumerImpl _sub;
private static int IDLE = 0;
private static int SCHEDULED = 1;
@@ -49,7 +49,7 @@ class SubFlushRunner implements Runnable
private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
- public SubFlushRunner(QueueConsumer sub)
+ public SubFlushRunner(QueueConsumerImpl sub)
{
_sub = sub;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 85be4c6a3d..8dd8dda220 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -20,15 +20,14 @@ package org.apache.qpid.server.security;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.model.AccessControlProvider;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.*;
import org.apache.qpid.server.plugin.AccessControlFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
import org.apache.qpid.server.security.access.ObjectProperties;
@@ -253,20 +252,24 @@ public class SecurityManager implements ConfigurationChangeListener
return true;
}
- public void authoriseBind(final Exchange exch, final AMQQueue queue, final String routingKey)
+ public void authoriseCreateBinding(Binding binding)
{
+ final Exchange exch = binding.getExchange();
+ final AMQQueue queue = binding.getQueue();
+ final String bindingKey = binding.getBindingKey();
+
boolean allowed =
checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.authorise(BIND, EXCHANGE, new ObjectProperties(exch, queue, routingKey));
+ return plugin.authorise(BIND, EXCHANGE, new ObjectProperties(exch, queue, bindingKey));
}
});
if(!allowed)
{
- throw new AccessControlException("Permission denied: binding " + routingKey);
+ throw new AccessControlException("Permission denied: binding " + bindingKey);
}
}
@@ -306,7 +309,7 @@ public class SecurityManager implements ConfigurationChangeListener
}
}
- public void accessVirtualhost(final String vhostname)
+ public void authoriseCreateConnection(final AMQConnectionModel connection)
{
if(!checkAllPlugins(new AccessCheck()
{
@@ -316,12 +319,15 @@ public class SecurityManager implements ConfigurationChangeListener
}
}))
{
- throw new AccessControlException("Permission denied: " + vhostname);
+ throw new AccessControlException("Permission denied: " + connection.getVirtualHostName());
}
}
- public void authoriseConsume(final AMQQueue queue)
+ public void authoriseCreateConsumer(final Consumer consumer)
{
+ // TODO
+ final AMQQueue queue = (AMQQueue) consumer.getMessageSource();
+
if(!checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
@@ -334,20 +340,17 @@ public class SecurityManager implements ConfigurationChangeListener
}
}
- public void authoriseCreateExchange(final Boolean autoDelete,
- final Boolean durable,
- final String exchangeName,
- final Boolean internal,
- final Boolean nowait,
- final Boolean passive,
- final String exchangeType)
+ public void authoriseCreateExchange(final Exchange exchange)
{
+ final String exchangeName = exchange.getName();
if(!checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.authorise(CREATE, EXCHANGE, new ObjectProperties(autoDelete, durable, exchangeName,
- internal, nowait, passive, exchangeType));
+ return plugin.authorise(CREATE, EXCHANGE, new ObjectProperties(exchange.isAutoDelete(),
+ exchange.isDurable(),
+ exchangeName,
+ exchange.getTypeName()));
}
}))
{
@@ -355,14 +358,18 @@ public class SecurityManager implements ConfigurationChangeListener
}
}
- public void authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive,
- final Boolean nowait, final Boolean passive, final String queueName, final String owner)
+ public void authoriseCreateQueue(final AMQQueue queue)
{
+ final String queueName = queue.getName();
if(! checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.authorise(CREATE, QUEUE, new ObjectProperties(autoDelete, durable, exclusive, nowait, passive, queueName, owner));
+ return plugin.authorise(CREATE, QUEUE, new ObjectProperties(queue.getAttribute(Queue.LIFETIME_POLICY) != LifetimePolicy.PERMANENT,
+ Boolean.TRUE.equals(queue.getAttribute(Queue.DURABLE)),
+ queue.getAttribute(Queue.EXCLUSIVE) != ExclusivityPolicy.NONE,
+ queueName,
+ queue.getOwner()));
}
}))
{
@@ -370,6 +377,7 @@ public class SecurityManager implements ConfigurationChangeListener
}
}
+
public void authoriseDelete(final AMQQueue queue)
{
if(!checkAllPlugins(new AccessCheck()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
index b9e0e5920f..7f136a953f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
@@ -182,7 +182,7 @@ public class ObjectProperties
}
public ObjectProperties(Boolean autoDelete, Boolean durable, String exchangeName,
- Boolean internal, Boolean nowait, Boolean passive, String exchangeType)
+ String exchangeType)
{
super();
@@ -191,14 +191,11 @@ public class ObjectProperties
put(Property.AUTO_DELETE, autoDelete);
put(Property.TEMPORARY, autoDelete);
put(Property.DURABLE, durable);
- put(Property.INTERNAL, internal);
- put(Property.NO_WAIT, nowait);
- put(Property.PASSIVE, passive);
put(Property.TYPE, exchangeType);
}
- public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive,
- String queueName, String owner)
+ public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive,
+ String queueName, String owner)
{
super();
@@ -208,8 +205,6 @@ public class ObjectProperties
put(Property.TEMPORARY, autoDelete);
put(Property.DURABLE, durable);
put(Property.EXCLUSIVE, exclusive);
- put(Property.NO_WAIT, nowait);
- put(Property.PASSIVE, passive);
put(Property.OWNER, owner);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index feb619f01a..190226f33c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -550,15 +550,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
- // Access check
- getSecurityManager().authoriseCreateQueue(autoDelete,
- durable,
- exclusive != null && exclusive != ExclusivityPolicy.NONE,
- null,
- null,
- queueName,
- owner);
-
synchronized (_queueRegistry)
{
if(_queueRegistry.getQueue(queueName) != null)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 8919599cba..1822992644 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -25,12 +25,17 @@ import junit.framework.TestCase;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
*/
public class HeadersBindingTest extends TestCase
@@ -73,7 +78,7 @@ public class HeadersBindingTest extends TestCase
public String getEncoding()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public byte getPriority()
@@ -131,12 +136,15 @@ public class HeadersBindingTest extends TestCase
private Map<String,Object> bindHeaders = new HashMap<String,Object>();
private MockHeader matchHeaders = new MockHeader();
private int _count = 0;
- private MockAMQQueue _queue;
+ private AMQQueue _queue;
protected void setUp()
{
_count++;
- _queue = new MockAMQQueue(getQueueName());
+ _queue = mock(AMQQueue.class);
+ VirtualHost vhost = mock(VirtualHost.class);
+ when(_queue.getVirtualHost()).thenReturn(vhost);
+ when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
}
protected String getQueueName()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
index 9acaadf9e1..f083d2ba5d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -22,10 +22,12 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Validate BindingLogSubjects are logged as expected
*/
@@ -45,8 +47,9 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
_routingKey = "RoutingKey";
_exchange = _testVhost.getExchange("amq.direct");
- _queue = new MockAMQQueue("BindingLogSubjectTest");
- ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn("BindingLogSubjectTest");
+ when(_queue.getVirtualHost()).thenReturn(_testVhost);
_subject = new BindingLogSubject(_routingKey, _exchange, _queue);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
index f89959febb..ccd694837e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
@@ -21,10 +21,12 @@
package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Validate QueueLogSubjects are logged as expected
*/
@@ -41,8 +43,9 @@ public class QueueLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _queue = new MockAMQQueue("QueueLogSubjectTest");
- ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn("QueueLogSubjectTest");
+ when(_queue.getVirtualHost()).thenReturn(_testVhost);
_subject = new QueueLogSubject(_queue);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 5f1e88dba9..57f7cf9729 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -43,6 +43,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -66,6 +68,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
_queues = new ArrayList<AMQQueue>();
_virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class);
when(_virtualHost.getConfiguration()).thenReturn(vhostConfig);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
index e51b575895..e77125bb0e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
@@ -52,7 +54,10 @@ public class ConflationQueueListTest extends TestCase
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY);
- _queue = new ConflationQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+
+ _queue = new ConflationQueue(virtualHost, queueAttributes);
_list = _queue.getEntries();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
index 35508bb2c4..dc920bf555 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -55,9 +56,11 @@ public class ConsumerListTest extends QpidTestCase
private QueueConsumer newMockConsumer()
{
- ConsumerTarget target = mock(ConsumerTarget.class);
- when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
- return new QueueConsumer(null,null,true,true,"sub",false,target);
+ QueueConsumer consumer = mock(QueueConsumer.class);
+ MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+ when(consumer.getOwningState()).thenReturn(owningState);
+
+ return consumer;
}
/**
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
deleted file mode 100644
index 934d60a23f..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ /dev/null
@@ -1,644 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.consumer.Consumer;
-import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class MockAMQQueue implements AMQQueue
-{
- private boolean _deleted = false;
- private String _name;
- private VirtualHost _virtualhost;
-
- private AuthorizationHolder _authorizationHolder;
-
- private AMQSessionModel _exclusiveOwner;
- private List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
- private boolean _autoDelete;
-
- public MockAMQQueue(String name)
- {
- _name = name;
- }
-
- @Override
- public void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy)
- throws ExistingConsumerPreventsExclusive
- {
-
- }
-
- public void addBinding(final Binding binding)
- {
- _bindings.add(binding);
- }
-
- public void removeBinding(final Binding binding)
- {
- _bindings.remove(binding);
- }
-
- public List<Binding> getBindings()
- {
- return _bindings;
- }
-
- public int getBindingCount()
- {
- return 0;
- }
-
- public LogSubject getLogSubject()
- {
- return new LogSubject()
- {
- public String toLogString()
- {
- return "[MockAMQQueue]";
- }
-
- };
- }
-
- public long getUnackedMessageBytes()
- {
- return 0;
- }
-
- public long getMessageDequeueCount()
- {
- return 0;
- }
-
- public long getTotalEnqueueSize()
- {
- return 0;
- }
-
- public long getTotalDequeueSize()
- {
- return 0;
- }
-
- public long getTotalDequeueCount()
- {
- return 0;
- }
-
- public long getTotalEnqueueCount()
- {
- return 0;
- }
-
- @Override
- public LifetimePolicy getLifetimePolicy()
- {
- return null;
- }
-
- public int getBindingCountHigh()
- {
- return 0;
- }
-
- public long getPersistentByteEnqueues()
- {
- return 0;
- }
-
- public long getPersistentByteDequeues()
- {
- return 0;
- }
-
- public long getPersistentMsgEnqueues()
- {
- return 0;
- }
-
- public long getPersistentMsgDequeues()
- {
- return 0;
- }
-
- public void purge(final long request)
- {
-
- }
-
- public long getCreateTime()
- {
- return 0;
- }
-
- public void setNoLocal(boolean b)
- {
-
- }
-
- public UUID getId()
- {
- return null;
- }
-
-
-
- public boolean isDurable()
- {
- return false;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public void setAutoDelete(boolean autodelete)
- {
- _autoDelete = autodelete;
- }
-
-
- public String getOwner()
- {
- return null;
- }
-
- public void setVirtualHost(VirtualHost virtualhost)
- {
- _virtualhost = virtualhost;
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualhost;
- }
-
- @Override
- public boolean resend(final QueueEntry entry, final Consumer consumer)
- {
- return false;
- }
-
- @Override
- public void addDeleteTask(final Action task)
- {
-
- }
-
- @Override
- public void enqueue(final ServerMessage message, final Action action)
- {
-
- }
-
- @Override
- public int compareTo(final Object o)
- {
- return 0;
- }
-
- @Override
- public Consumer addConsumer(final ConsumerTarget target,
- final FilterManager filters,
- final Class messageClass,
- final String consumerName,
- final EnumSet options)
- {
- return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
- options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
- options.contains(Consumer.Option.TRANSIENT), target );
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public int send(final ServerMessage message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action postEnqueueAction)
- {
- return 0;
- }
-
- public Collection<QueueConsumer> getConsumers()
- {
- return Collections.emptyList();
- }
-
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
- {
-
- }
-
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
- {
-
- }
-
- public int getConsumerCount()
- {
- return 0;
- }
-
- public int getActiveConsumerCount()
- {
- return 0;
- }
-
- public boolean hasExclusiveConsumer()
- {
- return false;
- }
-
- public boolean isUnused()
- {
- return false;
- }
-
- public boolean isEmpty()
- {
- return false;
- }
-
- public int getMessageCount()
- {
- return 0;
- }
-
- public int getUndeliveredMessageCount()
- {
- return 0;
- }
-
- public long getQueueDepth()
- {
- return 0;
- }
-
- public long getReceivedMessageCount()
- {
- return 0;
- }
-
- public long getOldestMessageArrivalTime()
- {
- return 0;
- }
-
- public boolean isDeleted()
- {
- return _deleted;
- }
-
- public int delete()
- {
- _deleted = true;
- return getMessageCount();
- }
-
-
- public void requeue(QueueEntry entry)
- {
- }
-
- public void dequeue(QueueEntry entry)
- {
- }
-
- @Override
- public void removeDeleteTask(final Action task)
- {
-
- }
-
- @Override
- public void decrementUnackedMsgCount(final QueueEntry queueEntry)
- {
-
- }
-
- @Override
- public List getMessagesOnTheQueue()
- {
- return null;
- }
-
- public List getMessagesOnTheQueue(long fromMessageId, long toMessageId)
- {
- return null;
- }
-
- public List<Long> getMessagesOnTheQueue(int num)
- {
- return null;
- }
-
- public List<Long> getMessagesOnTheQueue(int num, int offset)
- {
- return null;
- }
-
- public QueueEntry getMessageOnTheQueue(long messageId)
- {
- return null;
- }
-
- public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
- {
- return null;
- }
-
- public long getMaximumMessageSize()
- {
- return 0;
- }
-
- public void setMaximumMessageSize(long value)
- {
-
- }
-
- public long getMaximumMessageCount()
- {
- return 0;
- }
-
- public void setMaximumMessageCount(long value)
- {
-
- }
-
- public long getMaximumQueueDepth()
- {
- return 0;
- }
-
- public void setMaximumQueueDepth(long value)
- {
-
- }
-
- public long getMaximumMessageAge()
- {
- return 0;
- }
-
- public void setMaximumMessageAge(long maximumMessageAge)
- {
-
- }
-
- public long getMinimumAlertRepeatGap()
- {
- return 0;
- }
-
- public long clearQueue()
- {
- return 0;
- }
-
-
- public void checkMessageStatus()
- {
-
- }
-
- public Set<NotificationCheck> getNotificationChecks()
- {
- return null;
- }
-
- public void flushConsumer(Consumer sub)
- {
-
- }
-
- public void deliverAsync(Consumer sub)
- {
-
- }
-
- public void deliverAsync()
- {
-
- }
-
- public void stop()
- {
-
- }
-
- public boolean isExclusive()
- {
- return false;
- }
-
- @Override
- public boolean verifySessionAccess(final AMQSessionModel session)
- {
- return false;
- }
-
- public Exchange getAlternateExchange()
- {
- return null;
- }
-
- public void setAlternateExchange(Exchange exchange)
- {
-
- }
-
- @Override
- public Collection<String> getAvailableAttributes()
- {
- return null;
- }
-
- @Override
- public Object getAttribute(String attrName)
- {
- return null;
- }
-
- public void checkCapacity(AMQSessionModel channel)
- {
- }
-
- public int compareTo(AMQQueue o)
- {
- return 0;
- }
-
- public void setMinimumAlertRepeatGap(long value)
- {
-
- }
-
- public long getCapacity()
- {
- return 0;
- }
-
- public void setCapacity(long capacity)
- {
-
- }
-
- public long getFlowResumeCapacity()
- {
- return 0;
- }
-
- public void setFlowResumeCapacity(long flowResumeCapacity)
- {
-
- }
-
- public void configure(QueueConfiguration config)
- {
-
- }
-
- public AuthorizationHolder getAuthorizationHolder()
- {
- return _authorizationHolder;
- }
-
- public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
- {
- _authorizationHolder = authorizationHolder;
- }
-
- public AMQSessionModel getExclusiveOwningSession()
- {
- return _exclusiveOwner;
- }
-
- public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
- {
- _exclusiveOwner = exclusiveOwner;
- }
-
- public boolean isOverfull()
- {
- return false;
- }
-
- public int getConsumerCountHigh()
- {
- return 0;
- }
-
- public long getByteTxnEnqueues()
- {
- return 0;
- }
-
- public long getMsgTxnEnqueues()
- {
- return 0;
- }
-
- public long getByteTxnDequeues()
- {
- return 0;
- }
-
- public long getMsgTxnDequeues()
- {
- return 0;
- }
-
-
- public long getUnackedMessageCount()
- {
- return 0;
- }
-
- public long getUnackedMessageCountHigh()
- {
- return 0;
- }
-
- public void setExclusive(boolean exclusive)
- {
- }
-
- public int getMaximumDeliveryCount()
- {
- return 0;
- }
-
- public void setMaximumDeliveryCount(int maximumDeliveryCount)
- {
- }
-
- public void visit(final QueueEntryVisitor visitor)
- {
- }
-
- @Override
- public void setNotificationListener(NotificationListener listener)
- {
- }
-
- @Override
- public void setDescription(String description)
- {
- }
-
- @Override
- public String getDescription()
- {
- return null;
- }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index 8b2ee70b26..44d12f2763 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -51,7 +53,9 @@ public class PriorityQueueListTest extends QpidTestCase
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.PRIORITIES, 10);
- PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+ PriorityQueue queue = new PriorityQueue(virtualHost, queueAttributes);
_list = queue.getEntries();
for (int i = 0; i < PRIORITIES.length; i++)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index dc91be9cdb..fbebdb6008 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -24,11 +24,13 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.lang.reflect.Field;
@@ -48,6 +50,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
protected QueueEntryImpl _queueEntry;
protected QueueEntryImpl _queueEntry2;
protected QueueEntryImpl _queueEntry3;
+ private long _consumerId;
public abstract QueueEntryImpl getQueueEntryImpl(int msgId);
@@ -136,9 +139,11 @@ public abstract class QueueEntryImplTestBase extends TestCase
private QueueConsumer newConsumer()
{
- final ConsumerTarget target = mock(ConsumerTarget.class);
- when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
- final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
+ final QueueConsumer consumer = mock(QueueConsumer.class);
+
+ MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+ when(consumer.getOwningState()).thenReturn(owningState);
+ when(consumer.getId()).thenReturn(_consumerId++);
return consumer;
}
@@ -204,7 +209,10 @@ public abstract class QueueEntryImplTestBase extends TestCase
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ StandardQueue queue = new StandardQueue(virtualHost, queueAttributes);
OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 9813ab98b5..87cda34cc9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
@@ -44,7 +45,10 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest");
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ StandardQueue queue = new StandardQueue(virtualHost, queueAttributes);
queueEntryList = queue.getEntries();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index be2948c18d..6dfc067db7 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
@@ -86,7 +87,10 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueue
attributes.put(Queue.SORT_KEY, "KEY");
// Create test list
- _testQueue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ _testQueue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 2f1122fa9a..c6bc126510 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.mockito.Matchers.eq;
@@ -54,7 +55,10 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
attributes.put(Queue.SORT_KEY, "KEY");
- SortedQueue queue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ SortedQueue queue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 5da991b8dd..302c1f1870 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
@@ -50,7 +55,12 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- _testQueue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.set(logActor);
+ _testQueue = new StandardQueue(virtualHost, queueAttributes);
_sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
@@ -93,7 +103,9 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+ StandardQueue queue = new StandardQueue(virtualHost, queueAttributes);
return queue.getEntries();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index c5c68ecf45..96d9ee429d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -103,6 +104,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
_virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
@@ -362,6 +364,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
+ final org.apache.qpid.server.virtualhost.VirtualHost vh = mock(org.apache.qpid.server.virtualhost.VirtualHost.class);
+ when(vh.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+ when(queue.getVirtualHost()).thenReturn(vh);
final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
attributes.put(Queue.NAME, queueName);
if(exclusive)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 993e9ee4a8..3e1183d203 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -22,8 +22,7 @@ package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -34,6 +33,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A unit test ensuring that AutoCommitTransaction creates a separate transaction for
* each dequeue/enqueue operation that involves enlistable messages. Verifies
@@ -46,8 +48,8 @@ public class AutoCommitTransactionTest extends QpidTestCase
private ServerTransaction _transaction = null; // Class under test
private MessageStore _transactionLog;
- private AMQQueue _queue;
- private List<AMQQueue> _queues;
+ private BaseQueue _queue;
+ private List<BaseQueue> _queues;
private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action;
@@ -382,7 +384,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
for(int i = 0; i < queueDurableFlags.length; i++)
{
- final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
+ final BaseQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
queueEntries.add(new MockMessageInstance()
@@ -411,9 +413,9 @@ public class AutoCommitTransactionTest extends QpidTestCase
return new MockStoreTransaction(throwException);
}
- private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags)
+ private List<BaseQueue> createTestBaseQueues(boolean[] durableFlags)
{
- List<AMQQueue> queues = new ArrayList<AMQQueue>();
+ List<BaseQueue> queues = new ArrayList<BaseQueue>();
for (boolean b: durableFlags)
{
queues.add(createTestAMQQueue(b));
@@ -422,17 +424,11 @@ public class AutoCommitTransactionTest extends QpidTestCase
return queues;
}
- private AMQQueue createTestAMQQueue(final boolean durable)
+ private BaseQueue createTestAMQQueue(final boolean durable)
{
- return new MockAMQQueue("mockQueue")
- {
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- };
+ BaseQueue queue = mock(BaseQueue.class);
+ when(queue.isDurable()).thenReturn(durable);
+ return queue;
}
private ServerMessage createTestMessage(final boolean persistent)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index bdfdb55c7e..58c7401c60 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -22,8 +22,7 @@ package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -34,6 +33,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A unit test ensuring that LocalTransactionTest creates a long-lived store transaction
* that spans many dequeue/enqueue operations of enlistable messages. Verifies
@@ -45,8 +47,8 @@ public class LocalTransactionTest extends QpidTestCase
{
private ServerTransaction _transaction = null; // Class under test
- private AMQQueue _queue;
- private List<AMQQueue> _queues;
+ private BaseQueue _queue;
+ private List<BaseQueue> _queues;
private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action1;
@@ -77,7 +79,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testEnqueueToNonDurableQueueOfNonPersistentMessage() throws Exception
{
_message = createTestMessage(false);
- _queue = createTestAMQQueue(false);
+ _queue = createQueue(false);
_transaction.enqueue(_queue, _message, _action1);
@@ -93,7 +95,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testEnqueueToDurableQueueOfPersistentMessage() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.enqueue(_queue, _message, _action1);
@@ -109,7 +111,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testStoreEnqueueCausesException() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_storeTransaction = createTestStoreTransaction(true);
_transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction);
@@ -217,7 +219,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testDequeueFromNonDurableQueueOfNonPersistentMessage() throws Exception
{
_message = createTestMessage(false);
- _queue = createTestAMQQueue(false);
+ _queue = createQueue(false);
_transaction.dequeue(_queue, _message, _action1);
@@ -234,7 +236,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.dequeue(_queue, _message, _action1);
@@ -250,7 +252,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testStoreDequeueCausesException() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_storeTransaction = createTestStoreTransaction(true);
_transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction);
@@ -396,7 +398,7 @@ public class LocalTransactionTest extends QpidTestCase
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired());
@@ -419,7 +421,7 @@ public class LocalTransactionTest extends QpidTestCase
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -445,7 +447,7 @@ public class LocalTransactionTest extends QpidTestCase
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.addPostTransactionAction(_action1);
_transaction.dequeue(_queue, _message, _action2);
@@ -467,7 +469,7 @@ public class LocalTransactionTest extends QpidTestCase
public void testRollbackWorkWithAdditionalPostAction() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.addPostTransactionAction(_action1);
_transaction.dequeue(_queue, _message, _action2);
@@ -488,7 +490,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.enqueue(_queue, _message, _action1);
@@ -503,7 +505,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.enqueue(_queue, _message, _action1);
@@ -526,7 +528,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.dequeue(_queue, _message, _action1);
@@ -541,7 +543,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.enqueue(_queue, _message, _action1);
@@ -564,7 +566,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.enqueue(_queue, _message, _action1);
@@ -584,7 +586,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.enqueue(_queue, _message, _action1);
@@ -606,7 +608,7 @@ public class LocalTransactionTest extends QpidTestCase
for(int i = 0; i < queueDurableFlags.length; i++)
{
- final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
+ final TransactionLogResource queue = createQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
queueEntries.add(new MockMessageInstance()
@@ -635,28 +637,22 @@ public class LocalTransactionTest extends QpidTestCase
return new MockStoreTransaction(throwException);
}
- private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags)
+ private List<BaseQueue> createTestBaseQueues(boolean[] durableFlags)
{
- List<AMQQueue> queues = new ArrayList<AMQQueue>();
+ List<BaseQueue> queues = new ArrayList<BaseQueue>();
for (boolean b: durableFlags)
{
- queues.add(createTestAMQQueue(b));
+ queues.add(createQueue(b));
}
return queues;
}
- private AMQQueue createTestAMQQueue(final boolean durable)
+ private BaseQueue createQueue(final boolean durable)
{
- return new MockAMQQueue("mockQueue")
- {
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- };
+ BaseQueue queue = mock(BaseQueue.class);
+ when(queue.isDurable()).thenReturn(durable);
+ return queue;
}
private ServerMessage createTestMessage(final boolean persistent)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index ce059db703..d4af18aaec 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -197,7 +197,7 @@ public class ServerConnectionDelegate extends ServerDelegate
try
{
- vhost.getSecurityManager().accessVirtualhost(vhostName);
+ vhost.getSecurityManager().authoriseCreateConnection(sconn);
}
catch (AccessControlException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
index 582f951342..a29d56605a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
@@ -83,7 +83,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
// Check virtualhost access
try
{
- virtualHost.getSecurityManager().accessVirtualhost(virtualHostName);
+ virtualHost.getSecurityManager().authoriseCreateConnection(session);
}
catch (AccessControlException e)
{
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 008f16883a..1e2c7b0652 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.management.amqp;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
@@ -88,8 +89,9 @@ class ManagementNodeConsumer implements Consumer
}
@Override
- public void setNoLocal(final boolean noLocal)
+ public MessageSource getMessageSource()
{
+ return _managementNode;
}
@Override