diff options
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 95 |
1 files changed, 44 insertions, 51 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 07636216a6..74e5504f17 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -25,6 +25,8 @@ #include <algorithm> #include <functional> +#include <boost/bind.hpp> + #include "BrokerChannel.h" #include "DeletingTxOp.h" #include "framing/ChannelAdapter.h" @@ -50,7 +52,7 @@ Channel::Channel( u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold ) : - ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()), + ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), transactional(false), @@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, +// TODO aconway 2007-02-12: Why is connection token passed in instead +// of using the channel's parent connection? +void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - } catch(...) { - // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt. - delete c; - throw; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c( + new ConsumerImpl(this, tagInOut, queue, connection, acks)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); } void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); } void Channel::close(){ - if (isOpen()) { - opened = false; - while (!consumers.empty()) - cancel(consumers.begin()); - //requeue: - recover(true); - } + opened = false; + consumers.clear(); + recover(true); } void Channel::begin(){ @@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack +) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ return false; } +Channel::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); + if(queue) + queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); + if(blocked) + queue->dispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg) @@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg) connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + *deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); } } @@ -244,7 +238,8 @@ void Channel::ack(){ ack(getRequestInProgress(), false); } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) +{ if(transactional){ accumulatedAck.update(deliveryTag, multiple); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } + std::for_each(consumers.begin(), consumers.end(), + boost::bind(&ConsumerImpl::requestDispatch, _1)); } } @@ -328,8 +322,8 @@ void Channel::handleMethodInContext( method->invoke(*adapter, context); } }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), + adapter->getProxy().getChannel().close( + e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); }catch(ConnectionException& e){ @@ -338,4 +332,3 @@ void Channel::handleMethodInContext( connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } - |
