diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-21 01:15:30 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-21 01:15:30 +0000 |
| commit | 7e6f4149a73c4347475caa362f50e4e97d697e2d (patch) | |
| tree | a594a4ba22e59090ce699900f5a78d0c39eaac3a | |
| parent | 344ca0282a94ff5dc364a25186593249bbd478d8 (diff) | |
| download | qpid-python-7e6f4149a73c4347475caa362f50e4e97d697e2d.tar.gz | |
QPID-5567 : Move acl checks into the objects being created
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570411 13f79535-47bb-0310-9956-ffa450edef68
32 files changed, 723 insertions, 1247 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java index 469a4bb9d0..1e3bafa39e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -48,6 +48,10 @@ public class Binding _queue = queue; _exchange = exchange; _arguments = arguments == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(arguments); + + //Perform ACLs + queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + } public UUID getId() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java index e23eb397e1..410a0ba2af 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.consumer; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; public interface Consumer @@ -48,7 +50,7 @@ public interface Consumer AMQSessionModel getSessionModel(); - void setNoLocal(boolean noLocal); + MessageSource getMessageSource(); long getId(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index cb5902d234..a6aad93b27 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -119,6 +119,9 @@ public abstract class AbstractExchange implements Exchange _id = id; _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); + // check ACL + host.getSecurityManager().authoriseCreateExchange(this); + // Log Exchange creation CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable)); } @@ -624,9 +627,6 @@ public abstract class AbstractExchange implements Exchange arguments = Collections.emptyMap(); } - //Perform ACLs - _virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, bindingKey); - if (id == null) { id = UUIDGenerator.generateBindingUUID(getName(), diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 21586c6a4a..80aa4fa49c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -116,8 +116,6 @@ public class DefaultExchangeFactory implements ExchangeFactory public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQUnknownExchangeType { - // Check access - _host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type); ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type); if (exchType == null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 20e7edda92..28d926686f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 45dc556732..f7654d63fa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -20,441 +20,41 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; -import org.apache.qpid.server.util.StateChangeListener; - -import java.text.MessageFormat; -import java.util.EnumMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; +import org.apache.qpid.server.message.MessageInstance; -class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer +interface QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends Consumer { - public static enum State - { - ACTIVE, - SUSPENDED, - CLOSED - } - - private static final Logger _logger = Logger.getLogger(QueueConsumer.class); - private final AtomicBoolean _targetClosed = new AtomicBoolean(false); - private final AtomicBoolean _closed = new AtomicBoolean(false); - private final long _id; - private final Lock _stateChangeLock = new ReentrantLock(); - private final long _createTime = System.currentTimeMillis(); - private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this); - private final boolean _acquires; - private final boolean _seesRequeues; - private final String _consumerName; - private final boolean _isTransient; - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - private final FilterManager _filters; - private final Class<? extends ServerMessage> _messageClass; - private final Object _sessionReference; - private Q _queue; - private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) - + "(UNKNOWN)" - + "] "); - - static final EnumMap<ConsumerTarget.State, State> STATE_MAP = - new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class); - - static - { - STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE); - STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED); - STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED); - } - - private final T _target; - private final SubFlushRunner _runner = new SubFlushRunner(this); - private volatile QueueContext<E,Q,L> _queueContext; - private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>() - { - public void stateChanged(QueueConsumer sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - private boolean _noLocal; - - QueueConsumer(final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final boolean acquires, - final boolean seesRequeues, - final String consumerName, - final boolean isTransient, - T target) - { - _messageClass = messageClass; - _sessionReference = target.getSessionModel().getConnectionReference(); - _id = SUB_ID_GENERATOR.getAndIncrement(); - _filters = filters; - _acquires = acquires; - _seesRequeues = seesRequeues; - _consumerName = consumerName; - _isTransient = isTransient; - _target = target; - _target.setStateListener( - new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() - { - @Override - public void stateChanged(final ConsumerTarget object, - final ConsumerTarget.State oldState, - final ConsumerTarget.State newState) - { - targetStateChanged(oldState, newState); - } - }); - } - - private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState) - { - if(oldState != newState) - { - if(newState == ConsumerTarget.State.CLOSED) - { - if(_targetClosed.compareAndSet(false,true)) - { - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); - } - } - else - { - CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString())); - } - } - - if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) - { - close(); - } - final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener(); - if(stateListener != null) - { - stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState)); - } - } - - public T getTarget() - { - return _target; - } - - @Override - public void externalStateChange() - { - getQueue().deliverAsync(this); - } - - @Override - public long getUnacknowledgedBytes() - { - return _target.getUnacknowledgedBytes(); - } - - @Override - public long getUnacknowledgedMessages() - { - return _target.getUnacknowledgedMessages(); - } - - @Override - public AMQSessionModel getSessionModel() - { - return _target.getSessionModel(); - } - - @Override - public boolean isSuspended() - { - return _target.isSuspended(); - } - - @Override - public void close() - { - if(_closed.compareAndSet(false,true)) - { - getSendLock(); - try - { - _target.close(); - _target.consumerRemoved(this); - _queue.unregisterConsumer(this); - } - finally - { - releaseSendLock(); - } - - } - } - - void flushBatched() - { - _target.flushBatched(); - } - - void queueDeleted() - { - _target.queueDeleted(); - } - - boolean wouldSuspend(final MessageInstance msg) - { - return !_target.allocateCredit(msg.getMessage()); - } - - void restoreCredit(final MessageInstance queueEntry) - { - _target.restoreCredit(queueEntry.getMessage()); - } - - void queueEmpty() - { - _target.queueEmpty(); - } - - State getState() - { - return STATE_MAP.get(_target.getState()); - } - - public final Q getQueue() - { - return _queue; - } - - final void setQueue(Q queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for consumer " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - String queueString = new QueueLogSubject(_queue).toLogString(); - - _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) - + "(" - // queueString is [vh(/{0})/qu({1}) ] so need to trim - // ^ ^^ - + queueString.substring(1,queueString.length() - 3) - + ")" - + "] "); - - - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - final String filterLogString = getFilterLogString(); - CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, - filterLogString.length() > 0)); - } - } - - protected final LogSubject getLogSubject() - { - return _logActor.getLogSubject(); - } - - final LogActor getLogActor() - { - return _logActor; - } - - - @Override - public final void flush() - { - getQueue().flushConsumer(this); - } - - boolean resend(final E entry) - { - return getQueue().resend(entry, this); - } - - final SubFlushRunner getRunner() - { - return _runner; - } - - public final long getId() - { - return _id; - } - - public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener() - { - return _stateListener; - } + void flushBatched(); - public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener) - { - _stateListener = listener; - } + void queueEmpty(); - final QueueContext<E,Q,L> getQueueContext() - { - return _queueContext; - } + boolean hasInterest(E node); - final void setQueueContext(QueueContext<E,Q,L> queueContext) - { - _queueContext = queueContext; - } + boolean wouldSuspend(E entry); - public final boolean isActive() - { - return getState() == State.ACTIVE; - } + void restoreCredit(E entry); - public final boolean isClosed() - { - return getState() == State.CLOSED; - } + void send(E entry, boolean batch); - public final void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } + void queueDeleted(); - public final boolean hasInterest(E entry) - { - //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) - { + SubFlushRunner getRunner(); - return false; - } - - if (entry.getMessage().getClass() == _messageClass) - { - if(_noLocal) - { - Object connectionRef = entry.getMessage().getConnectionReference(); - if (connectionRef != null && connectionRef == _sessionReference) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), - _messageClass)==null) - { - return false; - } - } - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - protected String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } + Q getQueue(); - if (!acquires()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - } + boolean resend(E e); - return filterLogString.toString(); - } - - public final boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - public final void getSendLock() - { - _stateChangeLock.lock(); - } - - public final void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public final long getCreateTime() - { - return _createTime; - } - - final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState() - { - return _owningState; - } - - public final boolean acquires() - { - return _acquires; - } - - public final boolean seesRequeues() - { - return _seesRequeues; - } - - public final String getName() - { - return _consumerName; - } - - public final boolean isTransient() - { - return _isTransient; - } - - public final long getBytesOut() + public static enum State { - return _deliveredBytes.longValue(); + ACTIVE, + SUSPENDED, + CLOSED } - public final long getMessagesOut() - { - return _deliveredCount.longValue(); - } + MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState(); - final void send(final E entry, final boolean batch) - { - _deliveredCount.incrementAndGet(); - ServerMessage message = entry.getMessage(); - _deliveredBytes.addAndGet(message.getSize()); - _target.send(entry, batch); - } + QueueContext<E,Q,L> getQueueContext(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java new file mode 100644 index 0000000000..eeeab76656 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -0,0 +1,458 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.util.StateChangeListener; + +import java.text.MessageFormat; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; + +class QueueConsumerImpl<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueConsumer<T,E,Q,L> +{ + + + private static final Logger _logger = Logger.getLogger(QueueConsumerImpl.class); + private final AtomicBoolean _targetClosed = new AtomicBoolean(false); + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final long _id; + private final Lock _stateChangeLock = new ReentrantLock(); + private final long _createTime = System.currentTimeMillis(); + private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this); + private final boolean _acquires; + private final boolean _seesRequeues; + private final String _consumerName; + private final boolean _isTransient; + private final AtomicLong _deliveredCount = new AtomicLong(0); + private final AtomicLong _deliveredBytes = new AtomicLong(0); + private final FilterManager _filters; + private final Class<? extends ServerMessage> _messageClass; + private final Object _sessionReference; + private final Q _queue; + private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) + + "(UNKNOWN)" + + "] "); + + static final EnumMap<ConsumerTarget.State, State> STATE_MAP = + new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class); + + static + { + STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE); + STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED); + STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED); + } + + private final T _target; + private final SubFlushRunner _runner = new SubFlushRunner(this); + private volatile QueueContext<E,Q,L> _queueContext; + private StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumerImpl<T,E,Q,L>, State>() + { + public void stateChanged(QueueConsumerImpl sub, State oldState, State newState) + { + CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); + } + }; + private final boolean _noLocal; + + QueueConsumerImpl(final Q queue, + T target, final String consumerName, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + EnumSet<Option> optionSet) + { + + _messageClass = messageClass; + _sessionReference = target.getSessionModel().getConnectionReference(); + _id = SUB_ID_GENERATOR.getAndIncrement(); + _filters = filters; + _acquires = optionSet.contains(Option.ACQUIRES); + _seesRequeues = optionSet.contains(Option.SEES_REQUEUES); + _consumerName = consumerName; + _isTransient = optionSet.contains(Option.TRANSIENT); + _target = target; + _queue = queue; + _noLocal = optionSet.contains(Option.NO_LOCAL); + setupLogging(optionSet.contains(Option.EXCLUSIVE)); + + // Access control + _queue.getVirtualHost().getSecurityManager().authoriseCreateConsumer(this); + + + _target.setStateListener( + new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() + { + @Override + public void stateChanged(final ConsumerTarget object, + final ConsumerTarget.State oldState, + final ConsumerTarget.State newState) + { + targetStateChanged(oldState, newState); + } + }); + } + + private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState) + { + if(oldState != newState) + { + if(newState == ConsumerTarget.State.CLOSED) + { + if(_targetClosed.compareAndSet(false,true)) + { + CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); + } + } + else + { + CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString())); + } + } + + if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) + { + close(); + } + final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> stateListener = getStateListener(); + if(stateListener != null) + { + stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState)); + } + } + + public T getTarget() + { + return _target; + } + + @Override + public void externalStateChange() + { + getQueue().deliverAsync(this); + } + + @Override + public long getUnacknowledgedBytes() + { + return _target.getUnacknowledgedBytes(); + } + + @Override + public long getUnacknowledgedMessages() + { + return _target.getUnacknowledgedMessages(); + } + + @Override + public AMQSessionModel getSessionModel() + { + return _target.getSessionModel(); + } + + @Override + public MessageSource getMessageSource() + { + return _queue; + } + + @Override + public boolean isSuspended() + { + return _target.isSuspended(); + } + + @Override + public void close() + { + if(_closed.compareAndSet(false,true)) + { + getSendLock(); + try + { + _target.close(); + _target.consumerRemoved(this); + _queue.unregisterConsumer(this); + } + finally + { + releaseSendLock(); + } + + } + } + + public void flushBatched() + { + _target.flushBatched(); + } + + public void queueDeleted() + { + _target.queueDeleted(); + } + + public boolean wouldSuspend(final E msg) + { + return !_target.allocateCredit(msg.getMessage()); + } + + public void restoreCredit(final E queueEntry) + { + _target.restoreCredit(queueEntry.getMessage()); + } + + public void queueEmpty() + { + _target.queueEmpty(); + } + + State getState() + { + return STATE_MAP.get(_target.getState()); + } + + public final Q getQueue() + { + return _queue; + } + + private void setupLogging(final boolean exclusive) + { + String queueString = new QueueLogSubject(_queue).toLogString(); + + _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) + + "(" + // queueString is [vh(/{0})/qu({1}) ] so need to trim + // ^ ^^ + + queueString.substring(1,queueString.length() - 3) + + ")" + + "] "); + + + if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY)) + { + final String filterLogString = getFilterLogString(); + CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && exclusive, + filterLogString.length() > 0)); + } + } + + protected final LogSubject getLogSubject() + { + return _logActor.getLogSubject(); + } + + final LogActor getLogActor() + { + return _logActor; + } + + + @Override + public final void flush() + { + getQueue().flushConsumer(this); + } + + public boolean resend(final E entry) + { + return getQueue().resend(entry, this); + } + + public final SubFlushRunner getRunner() + { + return _runner; + } + + public final long getId() + { + return _id; + } + + public final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> getStateListener() + { + return _stateListener; + } + + public final void setStateListener(StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> listener) + { + _stateListener = listener; + } + + public final QueueContext<E,Q,L> getQueueContext() + { + return _queueContext; + } + + final void setQueueContext(QueueContext<E,Q,L> queueContext) + { + _queueContext = queueContext; + } + + public final boolean isActive() + { + return getState() == State.ACTIVE; + } + + public final boolean isClosed() + { + return getState() == State.CLOSED; + } + + public final boolean hasInterest(E entry) + { + //check that the message hasn't been rejected + if (entry.isRejectedBy(this)) + { + + return false; + } + + if (entry.getMessage().getClass() == _messageClass) + { + if(_noLocal) + { + Object connectionRef = entry.getMessage().getConnectionReference(); + if (connectionRef != null && connectionRef == _sessionReference) + { + return false; + } + } + } + else + { + // no interest in messages we can't convert + if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), + _messageClass)==null) + { + return false; + } + } + return (_filters == null) || _filters.allAllow(entry.asFilterable()); + } + + protected String getFilterLogString() + { + StringBuilder filterLogString = new StringBuilder(); + String delimiter = ", "; + boolean hasEntries = false; + if (_filters != null && _filters.hasFilters()) + { + filterLogString.append(_filters.toString()); + hasEntries = true; + } + + if (!acquires()) + { + if (hasEntries) + { + filterLogString.append(delimiter); + } + filterLogString.append("Browser"); + } + + return filterLogString.toString(); + } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public final void getSendLock() + { + _stateChangeLock.lock(); + } + + public final void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + public final long getCreateTime() + { + return _createTime; + } + + public final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState() + { + return _owningState; + } + + public final boolean acquires() + { + return _acquires; + } + + public final boolean seesRequeues() + { + return _seesRequeues; + } + + public final String getName() + { + return _consumerName; + } + + public final boolean isTransient() + { + return _isTransient; + } + + public final long getBytesOut() + { + return _deliveredBytes.longValue(); + } + + public final long getMessagesOut() + { + return _deliveredCount.longValue(); + } + + public final void send(final E entry, final boolean batch) + { + _deliveredCount.incrementAndGet(); + ServerMessage message = entry.getMessage(); + _deliveredBytes.addAndGet(message.getSize()); + _target.send(entry, batch); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6cbbbf0cb2..45660dec37 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,7 +18,6 @@ */ package org.apache.qpid.server.queue; -import java.security.AccessControlContext; import java.security.AccessController; import java.security.Principal; import java.util.*; @@ -192,8 +191,19 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA Map<String, Object> attributes, QueueEntryListFactory<E, Q, L> entryListFactory) { + if (virtualHost == null) + { + throw new IllegalArgumentException("Virtual Host must not be null"); + } + UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + + if (name == null) + { + throw new IllegalArgumentException("Queue name must not be null"); + } + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false); @@ -206,6 +216,30 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA attributes, LifetimePolicy.PERMANENT); + + _name = name; + _durable = durable; + _virtualHost = virtualHost; + _entries = entryListFactory.createQueueEntryList((Q) this); + final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes); + + arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); + arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); + + _arguments = Collections.synchronizedMap(arguments); + _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); + + _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); + + + _id = id; + _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + + _logSubject = new QueueLogSubject(this); + _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); + + virtualHost.getSecurityManager().authoriseCreateQueue(this); + Subject activeSubject = Subject.getSubject(AccessController.getContext()); Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class); AMQSessionModel<?,?> sessionModel; @@ -294,36 +328,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } } - if (name == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } - - if (virtualHost == null) - { - throw new IllegalArgumentException("Virtual Host must not be null"); - } - - _name = name; - _durable = durable; - _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList((Q) this); - final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes); - arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); - arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); - - _arguments = Collections.synchronizedMap(arguments); - _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); - - _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); - - - _id = id; - _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - - _logSubject = new QueueLogSubject(this); - _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) @@ -593,40 +598,38 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA ConsumerAccessRefused { - // Access control - getVirtualHost().getSecurityManager().authoriseConsume(this); - if (hasExclusiveConsumer()) { throw new ExistingExclusiveConsumer(); } + Object exclusiveOwner = _exclusiveOwner; switch(_exclusivityPolicy) { case CONNECTION: - if(_exclusiveOwner == null) + if(exclusiveOwner == null) { - _exclusiveOwner = target.getSessionModel().getConnectionModel(); + exclusiveOwner = target.getSessionModel().getConnectionModel(); addExclusivityConstraint(target.getSessionModel().getConnectionModel()); } else { - if(_exclusiveOwner != target.getSessionModel().getConnectionModel()) + if(exclusiveOwner != target.getSessionModel().getConnectionModel()) { throw new ConsumerAccessRefused(); } } break; case SESSION: - if(_exclusiveOwner == null) + if(exclusiveOwner == null) { - _exclusiveOwner = target.getSessionModel(); + exclusiveOwner = target.getSessionModel(); addExclusivityConstraint(target.getSessionModel()); } else { - if(_exclusiveOwner != target.getSessionModel()) + if(exclusiveOwner != target.getSessionModel()) { throw new ConsumerAccessRefused(); } @@ -639,26 +642,26 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } break; case PRINCIPAL: - if(_exclusiveOwner == null) + if(exclusiveOwner == null) { - _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); } else { - if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) { throw new ConsumerAccessRefused(); } } break; case CONTAINER: - if(_exclusiveOwner == null) + if(exclusiveOwner == null) { - _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); + exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); } else { - if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) + if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) { throw new ConsumerAccessRefused(); } @@ -673,15 +676,24 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL)) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.add(Consumer.Option.NO_LOCAL); + } + if(exclusive && getConsumerCount() != 0) { throw new ExistingConsumerPreventsExclusive(); } - QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass, - optionSet.contains(Consumer.Option.ACQUIRES), - optionSet.contains(Consumer.Option.SEES_REQUEUES), - consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); + QueueConsumerImpl<T,E,Q,L> consumer = new QueueConsumerImpl<T,E,Q,L>((Q)this, + target, + consumerName, + filters, messageClass, + optionSet); + + _exclusiveOwner = exclusiveOwner; target.consumerAdded(consumer); @@ -700,12 +712,6 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA if (!isDeleted()) { - consumer.setQueue((Q)this, exclusive); - if(_noLocal) - { - consumer.setNoLocal(true); - } - synchronized (_consumerListeners) { for(ConsumerRegistrationListener<Q> listener : _consumerListeners) @@ -732,7 +738,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA } - synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) + synchronized void unregisterConsumer(final QueueConsumerImpl<?,E,Q,L> consumer) { if (consumer == null) { @@ -1021,7 +1027,7 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA { return true; } - QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); + QueueConsumer<?,E,Q,L> assigned = _messageGroupManager.getAssignedConsumer(entry); return (assigned == null) || (assigned == sub); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index c35c41b4f7..cba992f347 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 2100e3c5cd..3026f33604 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -36,7 +36,7 @@ class SubFlushRunner implements Runnable private static final Logger _logger = Logger.getLogger(SubFlushRunner.class); - private final QueueConsumer _sub; + private final QueueConsumerImpl _sub; private static int IDLE = 0; private static int SCHEDULED = 1; @@ -49,7 +49,7 @@ class SubFlushRunner implements Runnable private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; private final AtomicBoolean _stateChange = new AtomicBoolean(); - public SubFlushRunner(QueueConsumer sub) + public SubFlushRunner(QueueConsumerImpl sub) { _sub = sub; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 85be4c6a3d..8dd8dda220 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -20,15 +20,14 @@ package org.apache.qpid.server.security; import org.apache.log4j.Logger; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.model.AccessControlProvider; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ConfigurationChangeListener; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.plugin.AccessControlFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.FileAccessControlProviderConstants; import org.apache.qpid.server.security.access.ObjectProperties; @@ -253,20 +252,24 @@ public class SecurityManager implements ConfigurationChangeListener return true; } - public void authoriseBind(final Exchange exch, final AMQQueue queue, final String routingKey) + public void authoriseCreateBinding(Binding binding) { + final Exchange exch = binding.getExchange(); + final AMQQueue queue = binding.getQueue(); + final String bindingKey = binding.getBindingKey(); + boolean allowed = checkAllPlugins(new AccessCheck() { Result allowed(AccessControl plugin) { - return plugin.authorise(BIND, EXCHANGE, new ObjectProperties(exch, queue, routingKey)); + return plugin.authorise(BIND, EXCHANGE, new ObjectProperties(exch, queue, bindingKey)); } }); if(!allowed) { - throw new AccessControlException("Permission denied: binding " + routingKey); + throw new AccessControlException("Permission denied: binding " + bindingKey); } } @@ -306,7 +309,7 @@ public class SecurityManager implements ConfigurationChangeListener } } - public void accessVirtualhost(final String vhostname) + public void authoriseCreateConnection(final AMQConnectionModel connection) { if(!checkAllPlugins(new AccessCheck() { @@ -316,12 +319,15 @@ public class SecurityManager implements ConfigurationChangeListener } })) { - throw new AccessControlException("Permission denied: " + vhostname); + throw new AccessControlException("Permission denied: " + connection.getVirtualHostName()); } } - public void authoriseConsume(final AMQQueue queue) + public void authoriseCreateConsumer(final Consumer consumer) { + // TODO + final AMQQueue queue = (AMQQueue) consumer.getMessageSource(); + if(!checkAllPlugins(new AccessCheck() { Result allowed(AccessControl plugin) @@ -334,20 +340,17 @@ public class SecurityManager implements ConfigurationChangeListener } } - public void authoriseCreateExchange(final Boolean autoDelete, - final Boolean durable, - final String exchangeName, - final Boolean internal, - final Boolean nowait, - final Boolean passive, - final String exchangeType) + public void authoriseCreateExchange(final Exchange exchange) { + final String exchangeName = exchange.getName(); if(!checkAllPlugins(new AccessCheck() { Result allowed(AccessControl plugin) { - return plugin.authorise(CREATE, EXCHANGE, new ObjectProperties(autoDelete, durable, exchangeName, - internal, nowait, passive, exchangeType)); + return plugin.authorise(CREATE, EXCHANGE, new ObjectProperties(exchange.isAutoDelete(), + exchange.isDurable(), + exchangeName, + exchange.getTypeName())); } })) { @@ -355,14 +358,18 @@ public class SecurityManager implements ConfigurationChangeListener } } - public void authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive, - final Boolean nowait, final Boolean passive, final String queueName, final String owner) + public void authoriseCreateQueue(final AMQQueue queue) { + final String queueName = queue.getName(); if(! checkAllPlugins(new AccessCheck() { Result allowed(AccessControl plugin) { - return plugin.authorise(CREATE, QUEUE, new ObjectProperties(autoDelete, durable, exclusive, nowait, passive, queueName, owner)); + return plugin.authorise(CREATE, QUEUE, new ObjectProperties(queue.getAttribute(Queue.LIFETIME_POLICY) != LifetimePolicy.PERMANENT, + Boolean.TRUE.equals(queue.getAttribute(Queue.DURABLE)), + queue.getAttribute(Queue.EXCLUSIVE) != ExclusivityPolicy.NONE, + queueName, + queue.getOwner())); } })) { @@ -370,6 +377,7 @@ public class SecurityManager implements ConfigurationChangeListener } } + public void authoriseDelete(final AMQQueue queue) { if(!checkAllPlugins(new AccessCheck() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index b9e0e5920f..7f136a953f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -182,7 +182,7 @@ public class ObjectProperties } public ObjectProperties(Boolean autoDelete, Boolean durable, String exchangeName, - Boolean internal, Boolean nowait, Boolean passive, String exchangeType) + String exchangeType) { super(); @@ -191,14 +191,11 @@ public class ObjectProperties put(Property.AUTO_DELETE, autoDelete); put(Property.TEMPORARY, autoDelete); put(Property.DURABLE, durable); - put(Property.INTERNAL, internal); - put(Property.NO_WAIT, nowait); - put(Property.PASSIVE, passive); put(Property.TYPE, exchangeType); } - public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive, - String queueName, String owner) + public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, + String queueName, String owner) { super(); @@ -208,8 +205,6 @@ public class ObjectProperties put(Property.TEMPORARY, autoDelete); put(Property.DURABLE, durable); put(Property.EXCLUSIVE, exclusive); - put(Property.NO_WAIT, nowait); - put(Property.PASSIVE, passive); put(Property.OWNER, owner); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index feb619f01a..190226f33c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -550,15 +550,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE); String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - // Access check - getSecurityManager().authoriseCreateQueue(autoDelete, - durable, - exclusive != null && exclusive != ExclusivityPolicy.NONE, - null, - null, - queueName, - owner); - synchronized (_queueRegistry) { if(_queueRegistry.getQueue(queueName) != null) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 8919599cba..1822992644 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -25,12 +25,17 @@ import junit.framework.TestCase; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.*; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; import java.util.Map; import java.util.Set; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** */ public class HeadersBindingTest extends TestCase @@ -73,7 +78,7 @@ public class HeadersBindingTest extends TestCase public String getEncoding() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public byte getPriority() @@ -131,12 +136,15 @@ public class HeadersBindingTest extends TestCase private Map<String,Object> bindHeaders = new HashMap<String,Object>(); private MockHeader matchHeaders = new MockHeader(); private int _count = 0; - private MockAMQQueue _queue; + private AMQQueue _queue; protected void setUp() { _count++; - _queue = new MockAMQQueue(getQueueName()); + _queue = mock(AMQQueue.class); + VirtualHost vhost = mock(VirtualHost.class); + when(_queue.getVirtualHost()).thenReturn(vhost); + when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); } protected String getQueueName() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java index 9acaadf9e1..f083d2ba5d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java @@ -22,10 +22,12 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Validate BindingLogSubjects are logged as expected */ @@ -45,8 +47,9 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); _routingKey = "RoutingKey"; _exchange = _testVhost.getExchange("amq.direct"); - _queue = new MockAMQQueue("BindingLogSubjectTest"); - ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + _queue = mock(AMQQueue.class); + when(_queue.getName()).thenReturn("BindingLogSubjectTest"); + when(_queue.getVirtualHost()).thenReturn(_testVhost); _subject = new BindingLogSubject(_routingKey, _exchange, _queue); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java index f89959febb..ccd694837e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java @@ -21,10 +21,12 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Validate QueueLogSubjects are logged as expected */ @@ -41,8 +43,9 @@ public class QueueLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); - _queue = new MockAMQQueue("QueueLogSubjectTest"); - ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + _queue = mock(AMQQueue.class); + when(_queue.getName()).thenReturn("QueueLogSubjectTest"); + when(_queue.getVirtualHost()).thenReturn(_testVhost); _subject = new QueueLogSubject(_queue); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 5f1e88dba9..57f7cf9729 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -43,6 +43,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.security.*; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -66,6 +68,7 @@ public class AMQQueueFactoryTest extends QpidTestCase _queues = new ArrayList<AMQQueue>(); _virtualHost = mock(VirtualHost.class); + when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class); when(_virtualHost.getConfiguration()).thenReturn(vhostConfig); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java index e51b575895..e77125bb0e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java @@ -27,6 +27,8 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.*; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; @@ -52,7 +54,10 @@ public class ConflationQueueListTest extends TestCase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY); - _queue = new ConflationQueue(mock(VirtualHost.class), queueAttributes); + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); + + _queue = new ConflationQueue(virtualHost, queueAttributes); _list = _queue.getEntries(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java index 35508bb2c4..dc920bf555 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -55,9 +56,11 @@ public class ConsumerListTest extends QpidTestCase private QueueConsumer newMockConsumer() { - ConsumerTarget target = mock(ConsumerTarget.class); - when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class)); - return new QueueConsumer(null,null,true,true,"sub",false,target); + QueueConsumer consumer = mock(QueueConsumer.class); + MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer); + when(consumer.getOwningState()).thenReturn(owningState); + + return consumer; } /** diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java deleted file mode 100644 index 934d60a23f..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ /dev/null @@ -1,644 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; - -public class MockAMQQueue implements AMQQueue -{ - private boolean _deleted = false; - private String _name; - private VirtualHost _virtualhost; - - private AuthorizationHolder _authorizationHolder; - - private AMQSessionModel _exclusiveOwner; - private List<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); - private boolean _autoDelete; - - public MockAMQQueue(String name) - { - _name = name; - } - - @Override - public void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy) - throws ExistingConsumerPreventsExclusive - { - - } - - public void addBinding(final Binding binding) - { - _bindings.add(binding); - } - - public void removeBinding(final Binding binding) - { - _bindings.remove(binding); - } - - public List<Binding> getBindings() - { - return _bindings; - } - - public int getBindingCount() - { - return 0; - } - - public LogSubject getLogSubject() - { - return new LogSubject() - { - public String toLogString() - { - return "[MockAMQQueue]"; - } - - }; - } - - public long getUnackedMessageBytes() - { - return 0; - } - - public long getMessageDequeueCount() - { - return 0; - } - - public long getTotalEnqueueSize() - { - return 0; - } - - public long getTotalDequeueSize() - { - return 0; - } - - public long getTotalDequeueCount() - { - return 0; - } - - public long getTotalEnqueueCount() - { - return 0; - } - - @Override - public LifetimePolicy getLifetimePolicy() - { - return null; - } - - public int getBindingCountHigh() - { - return 0; - } - - public long getPersistentByteEnqueues() - { - return 0; - } - - public long getPersistentByteDequeues() - { - return 0; - } - - public long getPersistentMsgEnqueues() - { - return 0; - } - - public long getPersistentMsgDequeues() - { - return 0; - } - - public void purge(final long request) - { - - } - - public long getCreateTime() - { - return 0; - } - - public void setNoLocal(boolean b) - { - - } - - public UUID getId() - { - return null; - } - - - - public boolean isDurable() - { - return false; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - - public void setAutoDelete(boolean autodelete) - { - _autoDelete = autodelete; - } - - - public String getOwner() - { - return null; - } - - public void setVirtualHost(VirtualHost virtualhost) - { - _virtualhost = virtualhost; - } - - public VirtualHost getVirtualHost() - { - return _virtualhost; - } - - @Override - public boolean resend(final QueueEntry entry, final Consumer consumer) - { - return false; - } - - @Override - public void addDeleteTask(final Action task) - { - - } - - @Override - public void enqueue(final ServerMessage message, final Action action) - { - - } - - @Override - public int compareTo(final Object o) - { - return 0; - } - - @Override - public Consumer addConsumer(final ConsumerTarget target, - final FilterManager filters, - final Class messageClass, - final String consumerName, - final EnumSet options) - { - return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES), - options.contains(Consumer.Option.SEES_REQUEUES), consumerName, - options.contains(Consumer.Option.TRANSIENT), target ); - } - - - public String getName() - { - return _name; - } - - public int send(final ServerMessage message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) - { - return 0; - } - - public Collection<QueueConsumer> getConsumers() - { - return Collections.emptyList(); - } - - public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) - { - - } - - public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) - { - - } - - public int getConsumerCount() - { - return 0; - } - - public int getActiveConsumerCount() - { - return 0; - } - - public boolean hasExclusiveConsumer() - { - return false; - } - - public boolean isUnused() - { - return false; - } - - public boolean isEmpty() - { - return false; - } - - public int getMessageCount() - { - return 0; - } - - public int getUndeliveredMessageCount() - { - return 0; - } - - public long getQueueDepth() - { - return 0; - } - - public long getReceivedMessageCount() - { - return 0; - } - - public long getOldestMessageArrivalTime() - { - return 0; - } - - public boolean isDeleted() - { - return _deleted; - } - - public int delete() - { - _deleted = true; - return getMessageCount(); - } - - - public void requeue(QueueEntry entry) - { - } - - public void dequeue(QueueEntry entry) - { - } - - @Override - public void removeDeleteTask(final Action task) - { - - } - - @Override - public void decrementUnackedMsgCount(final QueueEntry queueEntry) - { - - } - - @Override - public List getMessagesOnTheQueue() - { - return null; - } - - public List getMessagesOnTheQueue(long fromMessageId, long toMessageId) - { - return null; - } - - public List<Long> getMessagesOnTheQueue(int num) - { - return null; - } - - public List<Long> getMessagesOnTheQueue(int num, int offset) - { - return null; - } - - public QueueEntry getMessageOnTheQueue(long messageId) - { - return null; - } - - public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition) - { - return null; - } - - public long getMaximumMessageSize() - { - return 0; - } - - public void setMaximumMessageSize(long value) - { - - } - - public long getMaximumMessageCount() - { - return 0; - } - - public void setMaximumMessageCount(long value) - { - - } - - public long getMaximumQueueDepth() - { - return 0; - } - - public void setMaximumQueueDepth(long value) - { - - } - - public long getMaximumMessageAge() - { - return 0; - } - - public void setMaximumMessageAge(long maximumMessageAge) - { - - } - - public long getMinimumAlertRepeatGap() - { - return 0; - } - - public long clearQueue() - { - return 0; - } - - - public void checkMessageStatus() - { - - } - - public Set<NotificationCheck> getNotificationChecks() - { - return null; - } - - public void flushConsumer(Consumer sub) - { - - } - - public void deliverAsync(Consumer sub) - { - - } - - public void deliverAsync() - { - - } - - public void stop() - { - - } - - public boolean isExclusive() - { - return false; - } - - @Override - public boolean verifySessionAccess(final AMQSessionModel session) - { - return false; - } - - public Exchange getAlternateExchange() - { - return null; - } - - public void setAlternateExchange(Exchange exchange) - { - - } - - @Override - public Collection<String> getAvailableAttributes() - { - return null; - } - - @Override - public Object getAttribute(String attrName) - { - return null; - } - - public void checkCapacity(AMQSessionModel channel) - { - } - - public int compareTo(AMQQueue o) - { - return 0; - } - - public void setMinimumAlertRepeatGap(long value) - { - - } - - public long getCapacity() - { - return 0; - } - - public void setCapacity(long capacity) - { - - } - - public long getFlowResumeCapacity() - { - return 0; - } - - public void setFlowResumeCapacity(long flowResumeCapacity) - { - - } - - public void configure(QueueConfiguration config) - { - - } - - public AuthorizationHolder getAuthorizationHolder() - { - return _authorizationHolder; - } - - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) - { - _authorizationHolder = authorizationHolder; - } - - public AMQSessionModel getExclusiveOwningSession() - { - return _exclusiveOwner; - } - - public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) - { - _exclusiveOwner = exclusiveOwner; - } - - public boolean isOverfull() - { - return false; - } - - public int getConsumerCountHigh() - { - return 0; - } - - public long getByteTxnEnqueues() - { - return 0; - } - - public long getMsgTxnEnqueues() - { - return 0; - } - - public long getByteTxnDequeues() - { - return 0; - } - - public long getMsgTxnDequeues() - { - return 0; - } - - - public long getUnackedMessageCount() - { - return 0; - } - - public long getUnackedMessageCountHigh() - { - return 0; - } - - public void setExclusive(boolean exclusive) - { - } - - public int getMaximumDeliveryCount() - { - return 0; - } - - public void setMaximumDeliveryCount(int maximumDeliveryCount) - { - } - - public void visit(final QueueEntryVisitor visitor) - { - } - - @Override - public void setNotificationListener(NotificationListener listener) - { - } - - @Override - public void setDescription(String description) - { - } - - @Override - public String getDescription() - { - return null; - } - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 8b2ee70b26..44d12f2763 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -27,6 +27,8 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; + +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -51,7 +53,9 @@ public class PriorityQueueListTest extends QpidTestCase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.PRIORITIES, 10); - PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), queueAttributes); + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); + PriorityQueue queue = new PriorityQueue(virtualHost, queueAttributes); _list = queue.getEntries(); for (int i = 0; i < PRIORITIES.length; i++) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index dc91be9cdb..fbebdb6008 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -24,11 +24,13 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.security.*; import org.apache.qpid.server.virtualhost.VirtualHost; import java.lang.reflect.Field; @@ -48,6 +50,7 @@ public abstract class QueueEntryImplTestBase extends TestCase protected QueueEntryImpl _queueEntry; protected QueueEntryImpl _queueEntry2; protected QueueEntryImpl _queueEntry3; + private long _consumerId; public abstract QueueEntryImpl getQueueEntryImpl(int msgId); @@ -136,9 +139,11 @@ public abstract class QueueEntryImplTestBase extends TestCase private QueueConsumer newConsumer() { - final ConsumerTarget target = mock(ConsumerTarget.class); - when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class)); - final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target); + final QueueConsumer consumer = mock(QueueConsumer.class); + + MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer); + when(consumer.getOwningState()).thenReturn(owningState); + when(consumer.getId()).thenReturn(_consumerId++); return consumer; } @@ -204,7 +209,10 @@ public abstract class QueueEntryImplTestBase extends TestCase Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); - StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes); + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); + + StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); OrderedQueueEntryList queueEntryList = queue.getEntries(); // create test entries diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 9813ab98b5..87cda34cc9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.*; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; @@ -44,7 +45,10 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest"); - StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes); + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); + + StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); queueEntryList = queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index be2948c18d..6dfc067db7 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.*; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; @@ -86,7 +87,10 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueue attributes.put(Queue.SORT_KEY, "KEY"); // Create test list - _testQueue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); + + _testQueue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() { @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 2f1122fa9a..c6bc126510 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.*; import org.apache.qpid.server.virtualhost.VirtualHost; import static org.mockito.Matchers.eq; @@ -54,7 +55,10 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); attributes.put(Queue.SORT_KEY, "KEY"); - SortedQueue queue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); + + SortedQueue queue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() { @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 5da991b8dd..302c1f1870 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.*; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; @@ -50,7 +55,12 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); - _testQueue = new StandardQueue(mock(VirtualHost.class), queueAttributes); + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); + final LogActor logActor = mock(LogActor.class); + when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); + CurrentActor.set(logActor); + _testQueue = new StandardQueue(virtualHost, queueAttributes); _sqel = _testQueue.getEntries(); for(int i = 1; i <= 100; i++) @@ -93,7 +103,9 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); - StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes); + final VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); + StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); return queue.getEntries(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index c5c68ecf45..96d9ee429d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.test.utils.QpidTestCase; @@ -103,6 +104,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); _virtualHost = mock(VirtualHost.class); + when(_virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); @@ -362,6 +364,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); + final org.apache.qpid.server.virtualhost.VirtualHost vh = mock(org.apache.qpid.server.virtualhost.VirtualHost.class); + when(vh.getSecurityManager()).thenReturn(mock(SecurityManager.class)); + when(queue.getVirtualHost()).thenReturn(vh); final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments); attributes.put(Queue.NAME, queueName); if(exclusive) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 993e9ee4a8..3e1183d203 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -22,8 +22,7 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; @@ -34,6 +33,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * A unit test ensuring that AutoCommitTransaction creates a separate transaction for * each dequeue/enqueue operation that involves enlistable messages. Verifies @@ -46,8 +48,8 @@ public class AutoCommitTransactionTest extends QpidTestCase private ServerTransaction _transaction = null; // Class under test private MessageStore _transactionLog; - private AMQQueue _queue; - private List<AMQQueue> _queues; + private BaseQueue _queue; + private List<BaseQueue> _queues; private Collection<MessageInstance> _queueEntries; private ServerMessage _message; private MockAction _action; @@ -382,7 +384,7 @@ public class AutoCommitTransactionTest extends QpidTestCase for(int i = 0; i < queueDurableFlags.length; i++) { - final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]); + final BaseQueue queue = createTestAMQQueue(queueDurableFlags[i]); final ServerMessage message = createTestMessage(messagePersistentFlags[i]); queueEntries.add(new MockMessageInstance() @@ -411,9 +413,9 @@ public class AutoCommitTransactionTest extends QpidTestCase return new MockStoreTransaction(throwException); } - private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags) + private List<BaseQueue> createTestBaseQueues(boolean[] durableFlags) { - List<AMQQueue> queues = new ArrayList<AMQQueue>(); + List<BaseQueue> queues = new ArrayList<BaseQueue>(); for (boolean b: durableFlags) { queues.add(createTestAMQQueue(b)); @@ -422,17 +424,11 @@ public class AutoCommitTransactionTest extends QpidTestCase return queues; } - private AMQQueue createTestAMQQueue(final boolean durable) + private BaseQueue createTestAMQQueue(final boolean durable) { - return new MockAMQQueue("mockQueue") - { - @Override - public boolean isDurable() - { - return durable; - } - - }; + BaseQueue queue = mock(BaseQueue.class); + when(queue.isDurable()).thenReturn(durable); + return queue; } private ServerMessage createTestMessage(final boolean persistent) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index bdfdb55c7e..58c7401c60 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -22,8 +22,7 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; @@ -34,6 +33,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * A unit test ensuring that LocalTransactionTest creates a long-lived store transaction * that spans many dequeue/enqueue operations of enlistable messages. Verifies @@ -45,8 +47,8 @@ public class LocalTransactionTest extends QpidTestCase { private ServerTransaction _transaction = null; // Class under test - private AMQQueue _queue; - private List<AMQQueue> _queues; + private BaseQueue _queue; + private List<BaseQueue> _queues; private Collection<MessageInstance> _queueEntries; private ServerMessage _message; private MockAction _action1; @@ -77,7 +79,7 @@ public class LocalTransactionTest extends QpidTestCase public void testEnqueueToNonDurableQueueOfNonPersistentMessage() throws Exception { _message = createTestMessage(false); - _queue = createTestAMQQueue(false); + _queue = createQueue(false); _transaction.enqueue(_queue, _message, _action1); @@ -93,7 +95,7 @@ public class LocalTransactionTest extends QpidTestCase public void testEnqueueToDurableQueueOfPersistentMessage() throws Exception { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _transaction.enqueue(_queue, _message, _action1); @@ -109,7 +111,7 @@ public class LocalTransactionTest extends QpidTestCase public void testStoreEnqueueCausesException() throws Exception { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _storeTransaction = createTestStoreTransaction(true); _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); @@ -217,7 +219,7 @@ public class LocalTransactionTest extends QpidTestCase public void testDequeueFromNonDurableQueueOfNonPersistentMessage() throws Exception { _message = createTestMessage(false); - _queue = createTestAMQQueue(false); + _queue = createQueue(false); _transaction.dequeue(_queue, _message, _action1); @@ -234,7 +236,7 @@ public class LocalTransactionTest extends QpidTestCase public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _transaction.dequeue(_queue, _message, _action1); @@ -250,7 +252,7 @@ public class LocalTransactionTest extends QpidTestCase public void testStoreDequeueCausesException() throws Exception { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _storeTransaction = createTestStoreTransaction(true); _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); @@ -396,7 +398,7 @@ public class LocalTransactionTest extends QpidTestCase { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired()); @@ -419,7 +421,7 @@ public class LocalTransactionTest extends QpidTestCase { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -445,7 +447,7 @@ public class LocalTransactionTest extends QpidTestCase { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _transaction.addPostTransactionAction(_action1); _transaction.dequeue(_queue, _message, _action2); @@ -467,7 +469,7 @@ public class LocalTransactionTest extends QpidTestCase public void testRollbackWorkWithAdditionalPostAction() throws Exception { _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _transaction.addPostTransactionAction(_action1); _transaction.dequeue(_queue, _message, _action2); @@ -488,7 +490,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); long startTime = System.currentTimeMillis(); _transaction.enqueue(_queue, _message, _action1); @@ -503,7 +505,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _transaction.enqueue(_queue, _message, _action1); @@ -526,7 +528,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); long startTime = System.currentTimeMillis(); _transaction.dequeue(_queue, _message, _action1); @@ -541,7 +543,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); _transaction.enqueue(_queue, _message, _action1); @@ -564,7 +566,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); long startTime = System.currentTimeMillis(); _transaction.enqueue(_queue, _message, _action1); @@ -584,7 +586,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); _message = createTestMessage(true); - _queue = createTestAMQQueue(true); + _queue = createQueue(true); long startTime = System.currentTimeMillis(); _transaction.enqueue(_queue, _message, _action1); @@ -606,7 +608,7 @@ public class LocalTransactionTest extends QpidTestCase for(int i = 0; i < queueDurableFlags.length; i++) { - final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]); + final TransactionLogResource queue = createQueue(queueDurableFlags[i]); final ServerMessage message = createTestMessage(messagePersistentFlags[i]); queueEntries.add(new MockMessageInstance() @@ -635,28 +637,22 @@ public class LocalTransactionTest extends QpidTestCase return new MockStoreTransaction(throwException); } - private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags) + private List<BaseQueue> createTestBaseQueues(boolean[] durableFlags) { - List<AMQQueue> queues = new ArrayList<AMQQueue>(); + List<BaseQueue> queues = new ArrayList<BaseQueue>(); for (boolean b: durableFlags) { - queues.add(createTestAMQQueue(b)); + queues.add(createQueue(b)); } return queues; } - private AMQQueue createTestAMQQueue(final boolean durable) + private BaseQueue createQueue(final boolean durable) { - return new MockAMQQueue("mockQueue") - { - @Override - public boolean isDurable() - { - return durable; - } - - }; + BaseQueue queue = mock(BaseQueue.class); + when(queue.isDurable()).thenReturn(durable); + return queue; } private ServerMessage createTestMessage(final boolean persistent) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index ce059db703..d4af18aaec 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -197,7 +197,7 @@ public class ServerConnectionDelegate extends ServerDelegate try { - vhost.getSecurityManager().accessVirtualhost(vhostName); + vhost.getSecurityManager().authoriseCreateConnection(sconn); } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java index 582f951342..a29d56605a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java @@ -83,7 +83,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con // Check virtualhost access try { - virtualHost.getSecurityManager().accessVirtualhost(virtualHostName); + virtualHost.getSecurityManager().authoriseCreateConnection(session); } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 008f16883a..1e2c7b0652 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.management.amqp; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.StateChangeListener; @@ -88,8 +89,9 @@ class ManagementNodeConsumer implements Consumer } @Override - public void setNoLocal(final boolean noLocal) + public MessageSource getMessageSource() { + return _managementNode; } @Override |
