/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include "BrokerChannel.h" #include "DeletingTxOp.h" #include "framing/ChannelAdapter.h" #include #include #include #include #include #include #include #include "BrokerAdapter.h" #include "Connection.h" using std::mem_fun_ref; using std::bind2nd; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; Channel::Channel( Connection& con, ChannelId id, uint32_t _framesize, MessageStore* const _store, uint64_t _stagingThreshold ) : ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), transactional(false), prefetchSize(0), prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), accumulatedAck(0), store(_store), messageBuilder(this, _store, _stagingThreshold), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened adapter(new BrokerAdapter(*this, con, con.broker)) { outstanding.reset(); } Channel::~Channel(){ close(); } bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr c( new ConsumerImpl(this, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } void Channel::cancel(const string& tag){ // consumers is a ptr_map so erase will delete the consumer // which will call cancel. ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) consumers.erase(i); } void Channel::close(){ opened = false; consumers.clear(); recover(true); } void Channel::begin(){ transactional = true; } void Channel::commit(){ TxAck txAck(accumulatedAck, unacked); txBuffer.enlist(&txAck); if(txBuffer.prepare(store)){ txBuffer.commit(); } accumulatedAck.clear(); } void Channel::rollback(){ txBuffer.rollback(); accumulatedAck.clear(); } void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) { Mutex::ScopedLock locker(deliveryLock); // Key the delivered messages to the id of the request in which they're sent uint64_t deliveryTag = getNextSendRequestId(); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); outstanding.size += msg->contentSize(); outstanding.count++; } //send deliver method, header and content(s) msg->deliver(*this, consumerTag, deliveryTag, framesize); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack ) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local if(ackExpected && !parent->checkPrefetch(msg)){ blocked = true; }else{ blocked = false; parent->deliver(msg, tag, queue, ackExpected); return true; } } return false; } Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } void Channel::ConsumerImpl::cancel(){ if(queue) queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); exchange->route( *deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); exchange->route( deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); } } void Channel::handlePublish(Message* _message){ Message::shared_ptr message(_message); messageBuilder.initialise(message); } void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ messageBuilder.setHeader(header); //at this point, decide based on the size of the message whether we want //to stage it by saving content directly to disk as it arrives } void Channel::handleContent(AMQContentBody::shared_ptr content){ messageBuilder.addContent(content); } void Channel::handleHeartbeat(boost::shared_ptr) { // TODO aconway 2007-01-17: Implement heartbeating. } void Channel::complete(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); assert(exchange.get()); if(transactional) { std::auto_ptr deliverable(new TxPublish(msg)); exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable.release())); } else { DeliverableMessage deliverable(msg); exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); } } void Channel::ack(){ ack(getFirstAckRequest(), getLastAckRequest()); } // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) ack(0, deliveryTag); else ack(deliveryTag, deliveryTag); } void Channel::ack(uint64_t firstTag, uint64_t lastTag){ if(transactional){ accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers }else{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); ack_iterator j = (firstTag == 0) ? unacked.begin() : find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); if(i == unacked.end()){ throw ConnectionException(530, "Received ack for unrecognised delivery tag"); }else if(i!=j){ ack_iterator end = ++i; for_each(j, end, mem_fun_ref(&DeliveryRecord::discard)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: outstanding.reset(); for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); }else{ i->discard(); i->subtractFrom(&outstanding); unacked.erase(i); } //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); } } void Channel::recover(bool requeue){ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ outstanding.reset(); std::list copy = unacked; unacked.clear(); for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); } } bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); uint64_t myDeliveryTag = getNextSendRequestId(); msg->sendGetOk(MethodContext(this, msg->getRespondTo()), destination, queue->getMessageCount() + 1, myDeliveryTag, framesize); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } return true; }else{ return false; } } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag) { msg->deliver(*this, consumerTag, deliveryTag, framesize); } void Channel::handleMethodInContext( boost::shared_ptr method, const MethodContext& context ) { try{ if(getId() != 0 && !method->isA() && !isOpen()) { std::stringstream out; out << "Attempt to use unopened channel: " << getId(); throw ConnectionException(504, out.str()); } else { method->invoke(*adapter, context); } }catch(ChannelException& e){ adapter->getProxy().getChannel().close( e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); }catch(ConnectionException& e){ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } }