diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-13 02:41:14 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-13 02:41:14 +0000 |
| commit | 9517deedff9691dbe3429b0b917dfd4208b0b1b8 (patch) | |
| tree | f8868a2fbc63e92c770b401eeff2aee3a522697a /cpp/lib/broker/BrokerChannel.cpp | |
| parent | d26ea3376f66f69486fe214c8a7a8b96a7605c99 (diff) | |
| download | qpid-python-9517deedff9691dbe3429b0b917dfd4208b0b1b8.tar.gz | |
* gentools/templ.cpp/*Proxy*, CppGenerator.java: Changes to Proxy
classes to make them directly usable as an API for low-level AMQP access.
- Proxies hold reference to a ChannelAdapter not just an output handler.
- Removed MethodContext parameter, makes no sense on requester end.
- Return RequestId from request methods so caller can correlate
incoming responses.
- Add RequestId parameter to response methods so caller can provide
correlation for outgoing responses.
- No longer inherit from *Operations classes as the signatures no
longer match. Proxy is for caller (client/requester) and Operations
is for callee (server/responder)
* cpp/lib/client/ClientChannel.h: Channel provides a raw proxy to the broker.
Normal users will still use the Channel API to deal with the broker, but
advanced users (incl ourselves!) can use the raw API to directly send
and receive any AMQP message.
* cpp/lib/broker/BrokerChannel,BrokerAdapter: Refactor for new proxies.
broker::Channel is also a ClientProxy
* Sundry files:
- Pass ProtcolVersion by value, it is only two bytes.
- Misc. const correctness fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@506823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 95 |
1 files changed, 44 insertions, 51 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 07636216a6..74e5504f17 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -25,6 +25,8 @@ #include <algorithm> #include <functional> +#include <boost/bind.hpp> + #include "BrokerChannel.h" #include "DeletingTxOp.h" #include "framing/ChannelAdapter.h" @@ -50,7 +52,7 @@ Channel::Channel( u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold ) : - ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()), + ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), transactional(false), @@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, +// TODO aconway 2007-02-12: Why is connection token passed in instead +// of using the channel's parent connection? +void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - } catch(...) { - // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt. - delete c; - throw; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c( + new ConsumerImpl(this, tagInOut, queue, connection, acks)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); } void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); } void Channel::close(){ - if (isOpen()) { - opened = false; - while (!consumers.empty()) - cancel(consumers.begin()); - //requeue: - recover(true); - } + opened = false; + consumers.clear(); + recover(true); } void Channel::begin(){ @@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack +) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ return false; } +Channel::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); + if(queue) + queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); + if(blocked) + queue->dispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg) @@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg) connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + *deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); } } @@ -244,7 +238,8 @@ void Channel::ack(){ ack(getRequestInProgress(), false); } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) +{ if(transactional){ accumulatedAck.update(deliveryTag, multiple); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } + std::for_each(consumers.begin(), consumers.end(), + boost::bind(&ConsumerImpl::requestDispatch, _1)); } } @@ -328,8 +322,8 @@ void Channel::handleMethodInContext( method->invoke(*adapter, context); } }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), + adapter->getProxy().getChannel().close( + e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); }catch(ConnectionException& e){ @@ -338,4 +332,3 @@ void Channel::handleMethodInContext( connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } - |
