/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "Basic.h" #include "AMQMethodBody.h" #include "ClientChannel.h" #include "ReturnedMessageHandler.h" #include "MessageListener.h" #include "framing/FieldTable.h" #include "Connection.h" using namespace std; namespace qpid { namespace client { using namespace sys; using namespace framing; Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {} void Basic::consume( Queue& queue, std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { channel.sendAndReceiveSync( synch, new BasicConsumeBody( channel.version, 0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable())); if (synch) { BasicConsumeOkBody::shared_ptr response = boost::shared_polymorphic_downcast( channel.responses.getResponse()); tag = response->getConsumerTag(); } // FIXME aconway 2007-02-20: Race condition! // We could receive the first message for the consumer // before we create the consumer below. // Move consumer creation to handler for BasicConsumeOkBody { Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i != consumers.end()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer already exists with tag="+tag); Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; c.lastDeliveryTag = 0; } } void Basic::cancel(const std::string& tag, bool synch) { Consumer c; { Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i == consumers.end()) return; c = i->second; consumers.erase(i); } if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); channel.sendAndReceiveSync( synch, new BasicCancelBody(channel.version, tag, !synch)); } void Basic::cancelAll(){ ConsumerMap consumersCopy; { Mutex::ScopedLock l(lock); consumersCopy = consumers; consumers.clear(); } for (ConsumerMap::iterator i=consumersCopy.begin(); i != consumersCopy.end(); ++i) { Consumer& c = i->second; if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); } } } bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) { // Expect a message starting with a BasicGetOk incoming.startGet(); channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); return incoming.waitGet(msg); } void Basic::publish( const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) { const string e = exchange.getName(); string key = routingKey; channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these channel.send(msg.getHeader()); string data = msg.getData(); uint64_t data_length = data.length(); if(data_length > 0){ //frame itself uses 8 bytes uint32_t frag_size = channel.connection->getMaxFrameSize() - 8; if(data_length < frag_size){ channel.send(new AMQContentBody(data)); }else{ uint32_t offset = 0; uint32_t remaining = data_length - offset; while (remaining > 0) { uint32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); channel.send(new AMQContentBody(frag)); offset += length; remaining = data_length - offset; } } } } void Basic::handle(boost::shared_ptr method) { assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); switch(method->amqpMethodId()) { case BasicDeliverBody::METHOD_ID: case BasicReturnBody::METHOD_ID: case BasicGetOkBody::METHOD_ID: case BasicGetEmptyBody::METHOD_ID: incoming.add(method); return; } throw Channel::UnknownMethod(); } void Basic::deliver(Consumer& consumer, Message& msg){ //record delivery tag: consumer.lastDeliveryTag = msg.getDeliveryTag(); //allow registered listener to handle the message consumer.listener->received(msg); if(channel.isOpen()){ bool multiple(false); switch(consumer.ackMode){ case LAZY_ACK: multiple = true; if(++(consumer.count) < channel.getPrefetch()) break; //else drop-through case AUTO_ACK: consumer.lastDeliveryTag = 0; channel.send( new BasicAckBody( channel.version, msg.getDeliveryTag(), multiple)); case NO_ACK: // Nothing to do case CLIENT_ACK: // User code must ack. break; // TODO aconway 2007-02-22: Provide a way for user // to ack! } } //as it stands, transactionality is entirely orthogonal to ack //mode, though the acks will not be processed by the broker under //a transaction until it commits. } void Basic::run() { while(channel.isOpen()) { try { Message msg = incoming.waitDispatch(); if(msg.getMethod()->isA()) { ReturnedMessageHandler* handler=0; { Mutex::ScopedLock l(lock); handler=returnsHandler; } if(handler == 0) { // TODO aconway 2007-02-20: proper logging. cout << "Message returned: " << msg.getData() << endl; } else handler->returned(msg); } else { BasicDeliverBody::shared_ptr deliverBody = boost::shared_polymorphic_downcast( msg.getMethod()); std::string tag = deliverBody->getConsumerTag(); Consumer consumer; { Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if(i == consumers.end()) THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unknown consumer tag=" + tag); consumer = i->second; } deliver(consumer, msg); } } catch (const ShutdownException&) { /* Orderly shutdown */ } catch (const Exception& e) { // FIXME aconway 2007-02-20: Report exception to user. cout << "client::Basic::run() terminated by: " << e.toString() << "(" << typeid(e).name() << ")" << endl; } } } void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){ Mutex::ScopedLock l(lock); returnsHandler = handler; } void Basic::setQos(){ channel.sendAndReceive( new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); if(channel.isTransactional()) channel.sendAndReceive(new TxSelectBody(channel.version)); } // TODO aconway 2007-02-22: NOTES: // Move incoming to BasicChannel - check for uses. }} // namespace qpid::client