/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include using std::mem_fun_ref; using std::bind2nd; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : id(_id), out(_out), currentDeliveryTag(1), transactional(false), prefetchSize(0), prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), accumulatedAck(0), store(_store), messageBuilder(this, _store, _stagingThreshold), version(_version){ outstanding.reset(); } Channel::~Channel(){ } bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){ if(tag.empty()) tag = tagGenerator.generate(); ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception consumers[tag] = c; }catch(ExclusiveAccessException& e){ delete c; throw e; } } void Channel::cancel(consumer_iterator i){ ConsumerImpl* c = i->second; consumers.erase(i); if(c){ c->cancel(); delete c; } } void Channel::cancel(const string& tag){ consumer_iterator i = consumers.find(tag); if(i != consumers.end()){ cancel(i); } } void Channel::close(){ //cancel all consumers for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ cancel(i); } //requeue: recover(true); } void Channel::begin(){ transactional = true; } void Channel::commit(){ TxAck txAck(accumulatedAck, unacked); txBuffer.enlist(&txAck); if(txBuffer.prepare(store)){ txBuffer.commit(); } accumulatedAck.clear(); } void Channel::rollback(){ txBuffer.rollback(); accumulatedAck.clear(); } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ Mutex::ScopedLock locker(deliveryLock); u_int64_t deliveryTag = currentDeliveryTag++; if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); outstanding.size += msg->contentSize(); outstanding.count++; } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), ackExpected(ack), blocked(false){ } bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local if(ackExpected && !parent->checkPrefetch(msg)){ blocked = true; }else{ blocked = false; parent->deliver(msg, tag, queue, ackExpected); return true; } } return false; } void Channel::ConsumerImpl::cancel(){ if(queue) queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ Message::shared_ptr message(_message); exchange = _exchange; messageBuilder.initialise(message); } void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ messageBuilder.setHeader(header); //at this point, decide based on the size of the message whether we want //to stage it by saving content directly to disk as it arrives } void Channel::handleContent(AMQContentBody::shared_ptr content){ messageBuilder.addContent(content); } void Channel::complete(Message::shared_ptr& msg){ if(exchange){ if(transactional){ TxPublish* deliverable = new TxPublish(msg); exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); } exchange.reset(); }else{ std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; } } void Channel::ack(u_int64_t deliveryTag, bool multiple){ if(transactional){ accumulatedAck.update(deliveryTag, multiple); //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers }else{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); if(i == unacked.end()){ throw InvalidAckException(); }else if(multiple){ ack_iterator end = ++i; for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: outstanding.reset(); for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); }else{ i->discard(); i->subtractFrom(&outstanding); unacked.erase(i); } //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ j->second->requestDispatch(); } } } void Channel::recover(bool requeue){ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ outstanding.reset(); std::list copy = unacked; unacked.clear(); for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); } } bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } return true; }else{ return false; } } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); }