diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index cb2fe15b58..1ea18ea472 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -16,7 +16,10 @@ * */ #include "qpid/broker/SessionAdapter.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" @@ -98,17 +101,6 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const //exchange already there, not created checkType(response.first, type); checkAlternate(response.first, alternate); - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), - getConnection().getUserId(), - exchange, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(args), - "existing")); QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getUrl() @@ -165,12 +157,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, { getBroker().bind(queueName, exchangeName, routingKey, arguments, getConnection().getUserId(), getConnection().getUrl()); + state.addBinding(queueName, exchangeName, routingKey, arguments); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { + state.removeBinding(queueName, exchangeName, routingKey); getBroker().unbind(queueName, exchangeName, routingKey, getConnection().getUserId(), getConnection().getUrl()); } @@ -300,6 +294,8 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } catch (const qpid::types::Exception& e) { throw InvalidArgumentException(e.what()); } + // Identify queues that won't survive a failover. + settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay; std::pair<Queue::shared_ptr, bool> queue_created = getBroker().createQueue(name, settings, @@ -318,11 +314,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (exclusive && queue->setExclusiveOwner(&session)) { exclusiveQueues.push_back(queue); } - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments), - "existing")); QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getUrl() @@ -422,6 +413,11 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + if (queue->getSettings().isBrowseOnly && acquireMode == 0) { + QPID_LOG(info, "Overriding request to consume from browse-only queue " << queue->getName()); + acquireMode = 1; + } + // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA. if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0) throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " |