diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-22 23:23:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-22 23:23:52 +0000 |
| commit | 067f367d27bef7500410ea27c000d0ca275c748a (patch) | |
| tree | b20d2f526860870c22dbcffa3570ed347f8979ba /cpp/lib/client | |
| parent | 20a442ea00c82b7fd9b6b7a560916f69f3155f56 (diff) | |
| download | qpid-python-067f367d27bef7500410ea27c000d0ca275c748a.tar.gz | |
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class.
* cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
| -rw-r--r-- | cpp/lib/client/Basic.cpp | 255 | ||||
| -rw-r--r-- | cpp/lib/client/Basic.h | 195 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 264 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 194 | ||||
| -rw-r--r-- | cpp/lib/client/ClientMessage.h | 3 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.h | 1 | ||||
| -rw-r--r-- | cpp/lib/client/Makefile.am | 2 |
7 files changed, 539 insertions, 375 deletions
diff --git a/cpp/lib/client/Basic.cpp b/cpp/lib/client/Basic.cpp new file mode 100644 index 0000000000..88d46541f4 --- /dev/null +++ b/cpp/lib/client/Basic.cpp @@ -0,0 +1,255 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "Basic.h" +#include "AMQMethodBody.h" +#include "ClientChannel.h" +#include "ReturnedMessageHandler.h" +#include "MessageListener.h" +#include "framing/FieldTable.h" +#include "Connection.h" + +using namespace std; + +namespace qpid { +namespace client { + +using namespace sys; +using namespace framing; + +Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {} + +void Basic::consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) +{ + channel.sendAndReceiveSync<BasicConsumeOkBody>( + synch, + new BasicConsumeBody( + channel.version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable())); + if (synch) { + BasicConsumeOkBody::shared_ptr response = + boost::shared_polymorphic_downcast<BasicConsumeOkBody>( + channel.responses.getResponse()); + tag = response->getConsumerTag(); + } + // FIXME aconway 2007-02-20: Race condition! + // We could receive the first message for the consumer + // before we create the consumer below. + // Move consumer creation to handler for BasicConsumeOkBody + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } +} + + +void Basic::cancel(const std::string& tag, bool synch) { + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + channel.sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(channel.version, tag, !synch)); +} + +void Basic::cancelAll(){ + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; + if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) + && c.lastDeliveryTag > 0) + { + channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } + } +} + + + +bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) { + // Expect a message starting with a BasicGetOk + incoming.startGet(); + channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); + return incoming.waitGet(msg); +} + + +void Basic::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + const string e = exchange.getName(); + string key = routingKey; + channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)); + //break msg up into header frame and content frame(s) and send these + channel.send(msg.getHeader()); + string data = msg.getData(); + u_int64_t data_length = data.length(); + if(data_length > 0){ + //frame itself uses 8 bytes + u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; + if(data_length < frag_size){ + channel.send(new AMQContentBody(data)); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(data.substr(offset, length)); + channel.send(new AMQContentBody(frag)); + + offset += length; + remaining = data_length - offset; + } + } + } +} + +void Basic::handle(boost::shared_ptr<AMQMethodBody> method) { + assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); + switch(method->amqpMethodId()) { + case BasicDeliverBody::METHOD_ID: + case BasicReturnBody::METHOD_ID: + case BasicGetOkBody::METHOD_ID: + case BasicGetEmptyBody::METHOD_ID: + incoming.add(method); + return; + } + throw Channel::UnknownMethod(); +} + +void Basic::deliver(Consumer& consumer, Message& msg){ + //record delivery tag: + consumer.lastDeliveryTag = msg.getDeliveryTag(); + + //allow registered listener to handle the message + consumer.listener->received(msg); + + if(channel.isOpen()){ + bool multiple(false); + switch(consumer.ackMode){ + case LAZY_ACK: + multiple = true; + if(++(consumer.count) < channel.getPrefetch()) + break; + //else drop-through + case AUTO_ACK: + consumer.lastDeliveryTag = 0; + channel.send( + new BasicAckBody( + channel.version, msg.getDeliveryTag(), multiple)); + case NO_ACK: // Nothing to do + case CLIENT_ACK: // User code must ack. + break; + // TODO aconway 2007-02-22: Provide a way for user + // to ack! + } + } + + //as it stands, transactionality is entirely orthogonal to ack + //mode, though the acks will not be processed by the broker under + //a transaction until it commits. +} + + +void Basic::run() { + while(channel.isOpen()) { + try { + Message msg = incoming.waitDispatch(); + if(msg.getMethod()->isA<BasicReturnBody>()) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; + } + if(handler == 0) { + // TODO aconway 2007-02-20: proper logging. + cout << "Message returned: " << msg.getData() << endl; + } + else + handler->returned(msg); + } + else { + BasicDeliverBody::shared_ptr deliverBody = + boost::shared_polymorphic_downcast<BasicDeliverBody>( + msg.getMethod()); + std::string tag = deliverBody->getConsumerTag(); + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + tag); + consumer = i->second; + } + deliver(consumer, msg); + } + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::Basic::run() terminated by: " << e.toString() + << "(" << typeid(e).name() << ")" << endl; + } + } +} + +void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); + returnsHandler = handler; +} + +void Basic::setQos(){ + channel.sendAndReceive<BasicQosOkBody>( + new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); + if(channel.isTransactional()) + channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); +} + + +// TODO aconway 2007-02-22: NOTES: +// Move incoming to BasicChannel - check for uses. + +}} // namespace qpid::client diff --git a/cpp/lib/client/Basic.h b/cpp/lib/client/Basic.h new file mode 100644 index 0000000000..e598f5555b --- /dev/null +++ b/cpp/lib/client/Basic.h @@ -0,0 +1,195 @@ +#ifndef _client_Basic_h +#define _client_Basic_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "IncomingMessage.h" +#include "sys/Runnable.h" + +namespace qpid { + +namespace framing { +class AMQMethodBody; +class FieldTable; +} + +namespace client { + +class Channel; +class Message; +class Queue; +class Exchange; +class MessageListener; +class ReturnedMessageHandler; + +/** + * The available acknowledgements modes. + * + * \ingroup clientapi + */ +enum AckMode { + /** No acknowledgement will be sent, broker can + discard messages as soon as they are delivered + to a consumer using this mode. **/ + NO_ACK = 0, + /** Each message will be automatically + acknowledged as soon as it is delivered to the + application **/ + AUTO_ACK = 1, + /** Acknowledgements will be sent automatically, + but not for each message. **/ + LAZY_ACK = 2, + /** The application is responsible for explicitly + acknowledging messages. **/ + CLIENT_ACK = 3 +}; + + +/** + * Represents the AMQP Basic class for sending and receiving messages. + */ +class Basic : public sys::Runnable +{ + public: + Basic(Channel& parent); + + /** + * Creates a 'consumer' for a queue. Messages in (or arriving + * at) that queue will be delivered to consumers + * asynchronously. + * + * @param queue a Queue instance representing the queue to + * consume from + * + * @param tag an identifier to associate with the consumer + * that can be used to cancel its subscription (if empty, this + * will be assigned by the broker) + * + * @param listener a pointer to an instance of an + * implementation of the MessageListener interface. Messages + * received from this queue for this consumer will result in + * invocation of the received() method on the listener, with + * the message itself passed in. + * + * @param ackMode the mode of acknowledgement that the broker + * should assume for this consumer. @see AckMode + * + * @param noLocal if true, this consumer will not be sent any + * message published by this connection + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + /** + * Cancels a subscription previously set up through a call to consume(). + * + * @param tag the identifier used (or assigned) in the consume + * request that set up the subscription to be cancelled. + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void cancel(const std::string& tag, bool synch = true); + /** + * Synchronous pull of a message from a queue. + * + * @param msg a message object that will contain the message + * headers and content if the call completes. + * + * @param queue the queue to consume from + * + * @param ackMode the acknowledgement mode to use (@see + * AckMode) + * + * @return true if a message was succcessfully dequeued from + * the queue, false if the queue was empty. + */ + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + /** + * Publishes (i.e. sends a message to the broker). + * + * @param msg the message to publish + * + * @param exchange the exchange to publish the message to + * + * @param routingKey the routing key to publish with + * + * @param mandatory if true and the exchange to which this + * publish is directed has no matching bindings, the message + * will be returned (see setReturnedMessageHandler()). + * + * @param immediate if true and there is no consumer to + * receive this message on publication, the message will be + * returned (see setReturnedMessageHandler()). + */ + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + /** + * Set a handler for this channel that will process any + * returned messages + * + * @see publish() + */ + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + /** + * Deliver messages from the broker to the appropriate MessageListener. + */ + void run(); + + + private: + + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + + typedef std::map<std::string, Consumer> ConsumerMap; + + void handle(boost::shared_ptr<framing::AMQMethodBody>); + void setQos(); + void cancelAll(); + void deliver(Consumer& consumer, Message& msg); + + sys::Mutex lock; + Channel& channel; + IncomingMessage incoming; + ConsumerMap consumers; + ReturnedMessageHandler* returnsHandler; + + // FIXME aconway 2007-02-22: Remove friendship. + friend class Channel; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_Basic_h*/ diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index a8fa219c16..ec9ce71dd7 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -37,6 +37,7 @@ using namespace qpid::framing; using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : + basic(*this), connection(0), prefetch(_prefetch), transactional(_transactional) @@ -113,19 +114,16 @@ void Channel::protocolInit( bool Channel::isOpen() const { return connection; } +void Channel::setQos() { + basic.setQos(); + // FIXME aconway 2007-02-22: message +} + void Channel::setPrefetch(u_int16_t _prefetch){ prefetch = _prefetch; setQos(); } -void Channel::setQos(){ - sendAndReceive<BasicQosOkBody>( - new BasicQosBody(version, 0, prefetch, false)); - if(transactional){ - sendAndReceive<TxSelectOkBody>(new TxSelectBody(version)); - } -} - void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); @@ -177,114 +175,6 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri new QueueBindBody(version, 0, q, e, key,!synch, args)); } -void Channel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - sendAndReceiveSync<BasicConsumeOkBody>( - synch, - new BasicConsumeBody( - version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if (synch) { - BasicConsumeOkBody::shared_ptr response = - shared_polymorphic_downcast<BasicConsumeOkBody>( - responses.getResponse()); - tag = response->getConsumerTag(); - } - // FIXME aconway 2007-02-20: Race condition! - // We could receive the first message for the consumer - // before we create the consumer below. - // Move consumer creation to handler for BasicConsumeOkBody - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } -} - -void Channel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(version, tag, !synch)); -} - -void Channel::cancelAll(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) - { - Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) - { - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - } - } -} - -bool Channel::get(Message& msg, const Queue& queue, int ackMode) { - // Expect a message starting with a BasicGetOk - incoming.startGet(); - send(new BasicGetBody(version, 0, queue.getName(), ackMode)); - return incoming.waitGet(msg); -} - - -void Channel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - // FIXME aconway 2007-01-30: Rework for 0-9 message class. - const string e = exchange.getName(); - string key = routingKey; - - send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); - //break msg up into header frame and content frame(s) and send these - send(msg.header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - send(new AMQContentBody(data)); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - send(new AMQContentBody(frag)); - - offset += length; - remaining = data_length - offset; - } - } - } -} - void Channel::commit(){ sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); } @@ -294,46 +184,53 @@ void Channel::rollback(){ } void Channel::handleMethodInContext( - AMQMethodBody::shared_ptr body, const MethodContext&) + AMQMethodBody::shared_ptr method, const MethodContext&) { - //channel.flow, channel.close, basic.deliver, basic.return or a - //response to a synchronous request if(responses.isWaiting()) { - responses.signalResponse(body); + responses.signalResponse(method); return; } - - if(body->isA<BasicDeliverBody>() - || body->isA<BasicReturnBody>() - || body->isA<BasicGetOkBody>() - || body->isA<BasicGetEmptyBody>()) - - { - incoming.add(body); - return; + try { + switch (method->amqpClassId()) { + case BasicDeliverBody::CLASS_ID: basic.handle(method); break; + case ChannelCloseBody::CLASS_ID: handleChannel(method); break; + case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; + default: throw UnknownMethod(); + } } - else if(body->isA<ChannelCloseBody>()) { - peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); + catch (const UnknownMethod&) { + connection->close( + 504, "Unknown method", + method->amqpClassId(), method->amqpMethodId()); } - else if(body->isA<ChannelFlowBody>()){ - // TODO aconway 2007-01-24: not implemented yet. +} + +void Channel::handleChannel(AMQMethodBody::shared_ptr method) { + switch (method->amqpMethodId()) { + case ChannelCloseBody::METHOD_ID: + peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); + return; + case ChannelFlowBody::METHOD_ID: + // FIXME aconway 2007-02-22: Not yet implemented. + return; } - else if(body->isA<ConnectionCloseBody>()){ + throw UnknownMethod(); +} + +void Channel::handleConnection(AMQMethodBody::shared_ptr method) { + if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { connection->close(); - } - else { - connection->close( - 504, "Unrecognised method", - body->amqpClassId(), body->amqpMethodId()); - } + return; + } + throw UnknownMethod(); } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - incoming.add(body); + basic.incoming.add(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - incoming.add(body); + basic.incoming.add(body); } void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -341,82 +238,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - dispatcher = Thread(this); -} - -void Channel::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < prefetch) break; - //else drop-through - case AUTO_ACK: - send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); - consumer.lastDeliveryTag = 0; - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - -void Channel::run() { - while(isOpen()) { - try { - Message msg = incoming.waitDispatch(); - if(msg.getMethod()->isA<BasicReturnBody>()) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler == 0) { - // TODO aconway 2007-02-20: proper logging. - cout << "Message returned: " << msg.getData() << endl; - } - else - handler->returned(msg); - } - else { - BasicDeliverBody::shared_ptr deliverBody = - boost::shared_polymorphic_downcast<BasicDeliverBody>( - msg.getMethod()); - std::string tag = deliverBody->getConsumerTag(); - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + tag); - consumer = i->second; - } - deliver(consumer, msg); - } - } - catch (const ShutdownException&) { - /* Orderly shutdown */ - } - catch (const Exception& e) { - // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::Channel::run() terminated by: " << e.toString() - << "(" << typeid(e).name() << ")" << endl; - } - } -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; + basicDispatcher = Thread(basic); } // Close called by local application. @@ -452,13 +274,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { void Channel::closeInternal() { if (isOpen()); { - cancelAll(); - incoming.shutdown(); + basic.cancelAll(); + basic.incoming.shutdown(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); } - dispatcher.join(); + basicDispatcher.join(); } void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 9c422305b0..56fdd57d03 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -21,23 +21,15 @@ * under the License. * */ -#include <map> -#include <string> -#include <queue> -#include <boost/scoped_ptr.hpp> #include "sys/types.h" - #include <framing/amqp_framing.h> #include <ClientExchange.h> -#include <IncomingMessage.h> #include <ClientMessage.h> -#include <MessageListener.h> #include <ClientQueue.h> #include <ResponseHandler.h> -#include <ReturnedMessageHandler.h> -#include "Runnable.h" #include "ChannelAdapter.h" #include "Thread.h" +#include "Basic.h" namespace qpid { @@ -50,27 +42,6 @@ namespace client { class Connection; -/** - * The available acknowledgements modes - * - * \ingroup clientapi - */ -enum ack_modes { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; /** * Represents an AMQP channel, i.e. loosely a session of work. It @@ -79,41 +50,34 @@ enum ack_modes { * * \ingroup clientapi */ -class Channel : public framing::ChannelAdapter, - public sys::Runnable +class Channel : public framing::ChannelAdapter { - struct Consumer{ - MessageListener* listener; - int ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string, Consumer> ConsumerMap; + private: + // TODO aconway 2007-02-22: Remove friendship. + friend class Basic; + // FIXME aconway 2007-02-22: friend class Message; + struct UnknownMethod {}; + sys::Mutex lock; + Basic basic; Connection* connection; - sys::Thread dispatcher; - IncomingMessage incoming; + sys::Thread basicDispatcher; ResponseHandler responses; - ConsumerMap consumers; - ReturnedMessageHandler* returnsHandler; u_int16_t prefetch; const bool transactional; framing::ProtocolVersion version; - void retrieve(Message& msg); - void deliver(Consumer& consumer, Message& msg); - void handleHeader(framing::AMQHeaderBody::shared_ptr body); void handleContent(framing::AMQContentBody::shared_ptr body); void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleMethodInContext( - framing::AMQMethodBody::shared_ptr, - const framing::MethodContext& method); + framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); + void handleChannel(framing::AMQMethodBody::shared_ptr method); + void handleConnection(framing::AMQMethodBody::shared_ptr method); + void setQos(); - void cancelAll(); void protocolInit( const std::string& uid, const std::string& pwd, @@ -148,18 +112,18 @@ class Channel : public framing::ChannelAdapter, public: /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously + * Creates a channel object. + * + * @param transactional if true, the publishing and acknowledgement + * of messages will be transactional and can be committed or + * aborted in atomic units (@see commit(), @see rollback()) + * + * @param prefetch specifies the number of unacknowledged + * messages the channel is willing to have sent to it + * asynchronously */ - Channel(bool transactional = false, u_int16_t prefetch = 500); - ~Channel(); + Channel(bool transactional = false, u_int16_t prefetch = 500); + ~Channel(); /** * Declares an exchange. @@ -221,85 +185,16 @@ class Channel : public framing::ChannelAdapter, * @param synch if true this call will block until a response * is received from the broker */ - void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const framing::FieldTable& args, bool synch = true); - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see ack_modes - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); + void bind(const Exchange& exchange, const Queue& queue, + const std::string& key, const framing::FieldTable& args, + bool synch = true); + /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * ack_modes) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. + * Get a Basic object which provides functions to send and + * receive messages using the AMQP 0-8 Basic class methods. + *@see Basic */ - bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); + Basic& getBasic() { return basic; } /** * For a transactional channel this will commit all @@ -314,6 +209,7 @@ class Channel : public framing::ChannelAdapter, * object is created (@see Channel()). */ void commit(); + /** * For a transactional channel, this will rollback any * publications or acknowledgements. It will be as if the @@ -327,33 +223,25 @@ class Channel : public framing::ChannelAdapter, */ void setPrefetch(u_int16_t prefetch); + u_int16_t getPrefetch() { return prefetch; } + /** * Start message dispatching on a new thread */ void start(); - // TODO aconway 2007-01-26: Can it be private? - /** - * Dispatch messages on this channel in the calling thread. - */ - void run(); - /** * Close the channel with optional error information. * Closing a channel that is not open has no effect. */ void close( framing::ReplyCode = 200, const std::string& ="OK", - framing::ClassId = 0, framing::MethodId = 0); - - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); + framing::ClassId = 0, framing::MethodId = 0); + /** True if the channel is transactional */ + bool isTransactional() { return transactional; } + + /** True if the channel is open */ bool isOpen() const; }; diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h index 8661f6b791..0b44d51a47 100644 --- a/cpp/lib/client/ClientMessage.h +++ b/cpp/lib/client/ClientMessage.h @@ -112,7 +112,8 @@ class Message { { return method; } void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; } - boost::shared_ptr<framing::AMQHeaderBody> getHeader(); + boost::shared_ptr<framing::AMQHeaderBody> getHeader() const + { return header; } // TODO aconway 2007-02-15: remove friendships. friend class IncomingMessage; diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 6a8d88d581..c008463ff6 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -31,6 +31,7 @@ #include <sys/ShutdownHandler.h> #include <sys/TimeoutHandler.h> + #include "framing/amqp_types.h" #include <framing/amqp_framing.h> #include <ClientExchange.h> diff --git a/cpp/lib/client/Makefile.am b/cpp/lib/client/Makefile.am index e4bd5a1151..9f345d866d 100644 --- a/cpp/lib/client/Makefile.am +++ b/cpp/lib/client/Makefile.am @@ -13,6 +13,7 @@ libqpidclient_la_SOURCES = \ ClientExchange.cpp \ ClientMessage.cpp \ ClientQueue.cpp \ + Basic.cpp \ Connection.cpp \ Connector.cpp \ IncomingMessage.cpp \ @@ -24,6 +25,7 @@ pkginclude_HEADERS = \ ClientExchange.h \ ClientMessage.h \ ClientQueue.h \ + Basic.h \ Connection.h \ Connector.h \ IncomingMessage.h \ |
