summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp95
1 files changed, 44 insertions, 51 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 07636216a6..74e5504f17 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -25,6 +25,8 @@
#include <algorithm>
#include <functional>
+#include <boost/bind.hpp>
+
#include "BrokerChannel.h"
#include "DeletingTxOp.h"
#include "framing/ChannelAdapter.h"
@@ -50,7 +52,7 @@ Channel::Channel(
u_int32_t _framesize, MessageStore* const _store,
u_int64_t _stagingThreshold
) :
- ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()),
+ ChannelAdapter(id, &con.getOutput(), con.getVersion()),
connection(con),
currentDeliveryTag(1),
transactional(false),
@@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
+// TODO aconway 2007-02-12: Why is connection token passed in instead
+// of using the channel's parent connection?
+void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
- if(tag.empty()) tag = tagGenerator.generate();
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
- try{
- queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- } catch(...) {
- // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt.
- delete c;
- throw;
- }
-}
-
-void Channel::cancel(consumer_iterator i){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(
+ new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
}
void Channel::cancel(const string& tag){
- consumer_iterator i = consumers.find(tag);
- if(i != consumers.end()){
- cancel(i);
- }
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
}
void Channel::close(){
- if (isOpen()) {
- opened = false;
- while (!consumers.empty())
- cancel(consumers.begin());
- //requeue:
- recover(true);
- }
+ opened = false;
+ consumers.clear();
+ recover(true);
}
void Channel::begin(){
@@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
}
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection),
- ackExpected(ack),
- blocked(false){
-}
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
return false;
}
+Channel::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
void Channel::ConsumerImpl::cancel(){
- if(queue) queue->cancel(this);
+ if(queue)
+ queue->cancel(this);
}
void Channel::ConsumerImpl::requestDispatch(){
- if(blocked) queue->dispatch();
+ if(blocked)
+ queue->dispatch();
}
void Channel::handleInlineTransfer(Message::shared_ptr msg)
@@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg)
connection.broker.getExchanges().get(msg->getExchange());
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
- exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(
+ *deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ exchange->route(
+ deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
}
}
@@ -244,7 +238,8 @@ void Channel::ack(){
ack(getRequestInProgress(), false);
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple){
+void Channel::ack(u_int64_t deliveryTag, bool multiple)
+{
if(transactional){
accumulatedAck.update(deliveryTag, multiple);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
@@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
//if the prefetch limit had previously been reached, there may
//be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
- }
+ std::for_each(consumers.begin(), consumers.end(),
+ boost::bind(&ConsumerImpl::requestDispatch, _1));
}
}
@@ -328,8 +322,8 @@ void Channel::handleMethodInContext(
method->invoke(*adapter, context);
}
}catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
connection.closeChannel(getId());
}catch(ConnectionException& e){
@@ -338,4 +332,3 @@ void Channel::handleMethodInContext(
connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
-