diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
| commit | 5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch) | |
| tree | f9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client | |
| parent | 5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff) | |
| download | qpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz | |
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side.
* Improved various exception messages with boost::format messages.
* Removed unnecssary virtual inheritance.
* Widespread: fixed incorrect non-const ProtocolVersion& parameters.
* Client API: pass channels by reference, not pointer.
* codegen:
- MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template.
- Various: fixed non-const ProtocolVersion& parameters.
* cpp/bootstrap: Allow config arguments with -build.
* cpp/gen/Makefile.am: Merged codegen fixes from trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
| -rw-r--r-- | cpp/lib/client/ClientAdapter.cpp | 70 | ||||
| -rw-r--r-- | cpp/lib/client/ClientAdapter.h | 66 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 408 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 550 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 254 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.h | 273 | ||||
| -rw-r--r-- | cpp/lib/client/Connector.cpp | 30 | ||||
| -rw-r--r-- | cpp/lib/client/Connector.h | 14 | ||||
| -rw-r--r-- | cpp/lib/client/ResponseHandler.cpp | 39 | ||||
| -rw-r--r-- | cpp/lib/client/ResponseHandler.h | 51 |
10 files changed, 960 insertions, 795 deletions
diff --git a/cpp/lib/client/ClientAdapter.cpp b/cpp/lib/client/ClientAdapter.cpp new file mode 100644 index 0000000000..c77f049c96 --- /dev/null +++ b/cpp/lib/client/ClientAdapter.cpp @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQP_ClientOperations.h" +#include "ClientAdapter.h" +#include "Connection.h" +#include "Exception.h" +#include "AMQMethodBody.h" + +namespace qpid { +namespace client { + +using namespace qpid; +using namespace qpid::framing; + +typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + +void ClientAdapter::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) +{ + try{ + method->invoke(*clientOps, context); + }catch(ChannelException& e){ + connection.client->getChannel().close( + context, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + connection.closeChannel(getId()); + }catch(ConnectionException& e){ + connection.client->getConnection().close( + context, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.client->getConnection().close( + context, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + +void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { + channel->handleHeader(body); +} + +void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) { + channel->handleContent(body); +} + +void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { + // TODO aconway 2007-01-17: Implement heartbeats. +} + + + +}} // namespace qpid::client + diff --git a/cpp/lib/client/ClientAdapter.h b/cpp/lib/client/ClientAdapter.h new file mode 100644 index 0000000000..d5e16fc6ad --- /dev/null +++ b/cpp/lib/client/ClientAdapter.h @@ -0,0 +1,66 @@ +#ifndef _client_ClientAdapter_h +#define _client_ClientAdapter_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ChannelAdapter.h" +#include "ClientChannel.h" + +namespace qpid { +namespace client { + +class AMQMethodBody; +class Connection; + +/** + * Per-channel protocol adapter. + * + * Translates protocol bodies into calls on the core Channel, + * Connection and Client objects. + * + * Owns a channel, has references to Connection and Client. + */ +class ClientAdapter : public framing::ChannelAdapter +{ + public: + ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&); + Channel& getChannel() { return *channel; } + + void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); + void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); + void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); + + private: + void handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const framing::MethodContext& context); + + class ClientOps; + + std::auto_ptr<Channel> channel; + Connection& connection; + Client& client; + boost::shared_ptr<ClientOps> clientOps; +}; + +}} // namespace qpid::client + + + +#endif /*!_client_ClientAdapter_h*/ diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index d9edb2f390..b93596ebfc 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -23,42 +23,115 @@ #include <ClientMessage.h> #include <QpidError.h> #include <MethodBodyInstances.h> +#include "Connection.h" + +// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent +// handling of errors that should close the connection or the channel. +// Make sure the user thread receives a connection in each case. +// using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; +const std::string Channel::OK("OK"); + Channel::Channel(bool _transactional, u_int16_t _prefetch) : - id(0), - con(0), - out(0), + connection(0), incoming(0), - closed(true), prefetch(_prefetch), - transactional(_transactional), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - version(8, 0) + transactional(_transactional) { } Channel::~Channel(){ - stop(); + close(); +} + +void Channel::open(ChannelId id, Connection& con) +{ + if (isOpen()) + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); + connection = &con; + init(id, con, con.getVersion()); // ChannelAdapter initialization. + string oob; + if (id != 0) + sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); +} + +void Channel::protocolInit( + const std::string& uid, const std::string& pwd, const std::string& vhost) { + assert(connection); + responses.expect(); + connection->connector->init(); // Send ProtocolInit block. + responses.receive<ConnectionStartBody>(); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + // TODO aconway 2007-01-26: Move client over to proxy model, + // symmetric with server. + ConnectionTuneBody::shared_ptr proposal = + sendAndReceive<ConnectionTuneBody>( + new ConnectionStartOkBody( + version, props, mechanism, response, locale)); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + connection->send( + new AMQFrame( + version, 0, + new ConnectionTuneOkBody( + version, proposal->getChannelMax(), + connection->getMaxFrameSize(), + proposal->getHeartbeat()))); + + u_int16_t heartbeat = proposal->getHeartbeat(); + connection->connector->setReadTimeout(heartbeat * 2); + connection->connector->setWriteTimeout(heartbeat); + + // Send connection open. + std::string capabilities; + responses.expect(); + send(new AMQFrame( + version, 0, + new ConnectionOpenBody(version, vhost, capabilities, true))); + //receive connection.open-ok (or redirect, but ignore that for now + //esp. as using force=true). + responses.waitForResponse(); + if(responses.validate<ConnectionOpenOkBody>()) { + //ok + }else if(responses.validate<ConnectionRedirectBody>()){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect( + shared_polymorphic_downcast<ConnectionRedirectBody>( + responses.getResponse())); + std::cout << "Received redirection to " << redirect->getHost() + << std::endl; + } else { + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + } } + +bool Channel::isOpen() const { return connection; } void Channel::setPrefetch(u_int16_t _prefetch){ prefetch = _prefetch; - if(con != 0 && out != 0){ - setQos(); - } + setQos(); } void Channel::setQos(){ -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); + sendAndReceive<BasicQosOkBody>( + new BasicQosBody(version, 0, prefetch, false)); if(transactional){ - sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok); + sendAndReceive<TxSelectOkBody>(new TxSelectBody(version)); } } @@ -66,62 +139,51 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_declare_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<ExchangeDeclareOkBody>( + synch, + new ExchangeDeclareBody( + version, 0, name, type, false, false, false, false, !synch, args)); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_delete_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<ExchangeDeleteOkBody>( + synch, + new ExchangeDeleteBody(version, 0, name, false, !synch)); } void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false, - queue.isExclusive(), - queue.isAutoDelete(), !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_declare_ok); + sendAndReceiveSync<QueueDeclareOkBody>( + synch, + new QueueDeclareBody( + version, 0, name, false, false, + queue.isExclusive(), queue.isAutoDelete(), !synch, args)); + if (synch) { if(queue.getName().length() == 0){ QueueDeclareOkBody::shared_ptr response = - dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); + shared_polymorphic_downcast<QueueDeclareOkBody>( + responses.getResponse()); queue.setName(response->getQueue()); } - }else{ - out->send(frame); } } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ //ticket, queue, ifunused, ifempty, nowait string name = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_delete_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<QueueDeleteOkBody>( + synch, + new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_bind_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<QueueBindOkBody>( + synch, + new QueueBindBody(version, 0, q, e, key,!synch, args)); } void Channel::consume( @@ -129,52 +191,48 @@ void Channel::consume( int ackMode, bool noLocal, bool synch, const FieldTable* fields) { string q = queue.getName(); - AMQFrame* frame = - new AMQFrame(version, - id, - new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if(synch){ - sendAndReceive(frame, method_bodies.basic_consume_ok); - BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); + sendAndReceiveSync<BasicConsumeOkBody>( + synch, + new BasicConsumeBody( + version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable())); + if (synch) { + BasicConsumeOkBody::shared_ptr response = + shared_polymorphic_downcast<BasicConsumeOkBody>( + responses.getResponse()); tag = response->getConsumerTag(); - }else{ - out->send(frame); - } - Consumer* c = new Consumer(); - c->listener = listener; - c->ackMode = ackMode; - c->lastDeliveryTag = 0; - consumers[tag] = c; -} - -void Channel::cancel(std::string& tag, bool synch){ - Consumer* c = consumers[tag]; - if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); } - - AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.basic_cancel_ok); - }else{ - out->send(frame); - } - consumers.erase(tag); - if(c != 0){ - delete c; + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; +} + +void Channel::cancel(const std::string& tag, bool synch) { + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) { + Consumer& c = i->second; + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + send(new BasicAckBody(version, c.lastDeliveryTag, true)); + sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(version, tag, !synch)); + consumers.erase(tag); } } void Channel::cancelAll(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ - Consumer* c = i->second; - if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); + while(!consumers.empty()) { + Consumer c = consumers.begin()->second; + consumers.erase(consumers.begin()); + if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) + && c.lastDeliveryTag > 0) + { + // Let exceptions propagate, if one fails no point + // trying the rest. NB no memory leaks if we do, + // ConsumerMap holds values, not pointers. + // + send(new BasicAckBody(version, c.lastDeliveryTag, true)); } - consumers.erase(i); - delete c; } } @@ -191,26 +249,28 @@ void Channel::retrieve(Message& msg){ retrieved = 0; } -bool Channel::get(Message& msg, const Queue& queue, int ackMode){ +bool Channel::get(Message& msg, const Queue& queue, int ackMode) { string name = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode)); + AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode)); responses.expect(); - out->send(frame); + send(body); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); - if(method_bodies.basic_get_ok.match(response.get())){ + if(response->isA<BasicGetOkBody>()) { if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; + // FIXME aconway 2007-01-26: close the connection? the channel? THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); } retrieve(msg); return true; - }if(method_bodies.basic_get_empty.match(response.get())){ + }if(response->isA<BasicGetEmptyBody>()){ return false; }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); + // FIXME aconway 2007-01-26: must close the connection. + THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame"); } } @@ -219,25 +279,24 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); - AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(version, id, body)); + send(msg.header); u_int64_t data_length = data.length(); if(data_length > 0){ - u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes + u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes if(data_length < frag_size){ - out->send(new AMQFrame(version, id, new AMQContentBody(data))); + send(new AMQContentBody(data)); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - out->send(new AMQFrame(version, id, new AMQContentBody(frag))); + send(new AMQContentBody(frag)); offset += length; remaining = data_length - offset; @@ -247,56 +306,48 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } void Channel::commit(){ - AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version)); - sendAndReceive(frame, method_bodies.tx_commit_ok); + sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); } void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version)); - sendAndReceive(frame, method_bodies.tx_rollback_ok); -} - -void Channel::handleRequest(AMQRequestBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); + sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version)); } -void Channel::handleResponse(AMQResponseBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); -} - -void Channel::handleMethod(AMQMethodBody::shared_ptr body){ - //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request +void Channel::handleMethodInContext( + AMQMethodBody::shared_ptr body, const MethodContext&) +{ + //channel.flow, channel.close, basic.deliver, basic.return or a + //response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(method_bodies.basic_deliver.match(body.get())){ + }else if(body->isA<BasicDeliverBody>()) { if(incoming != 0){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); } - }else if(method_bodies.basic_return.match(body.get())){ + }else if(body->isA<BasicReturnBody>()){ if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); } - }else if(method_bodies.channel_close.match(body.get())){ - con->removeChannel(this); - //need to signal application that channel has been closed through exception - - }else if(method_bodies.channel_flow.match(body.get())){ - + }else if(body->isA<ChannelCloseBody>()){ + peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); + }else if(body->isA<ChannelFlowBody>()){ + // TODO aconway 2007-01-24: + }else if(body->isA<ConnectionCloseBody>()){ + connection->close(); }else{ - //signal error - std::cout << "Unhandled method: " << *body << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); + connection->close( + 504, "Unrecognised method", + body->amqpClassId(), body->amqpMethodId()); } } - + void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ if(incoming == 0){ //handle invalid frame sequence @@ -331,27 +382,16 @@ void Channel::start(){ dispatcher = Thread(this); } -void Channel::stop(){ - { - Monitor::ScopedLock l(dispatchMonitor); - closed = true; - responses.signalResponse(AMQMethodBody::shared_ptr()); - dispatchMonitor.notify(); - } - dispatcher.join(); -} - void Channel::run(){ dispatch(); } void Channel::enqueue(){ + Monitor::ScopedLock l(retrievalMonitor); if(incoming->isResponse()){ - Monitor::ScopedLock l(retrievalMonitor); retrieved = incoming; retrievalMonitor.notify(); }else{ - Monitor::ScopedLock l(dispatchMonitor); messages.push(incoming); dispatchMonitor.notify(); } @@ -360,7 +400,7 @@ void Channel::enqueue(){ IncomingMessage* Channel::dequeue(){ Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && !closed){ + while(messages.empty() && isOpen()){ dispatchMonitor.wait(); } IncomingMessage* msg = 0; @@ -371,25 +411,25 @@ IncomingMessage* Channel::dequeue(){ return msg; } -void Channel::deliver(Consumer* consumer, Message& msg){ +void Channel::deliver(Consumer& consumer, Message& msg){ //record delivery tag: - consumer->lastDeliveryTag = msg.getDeliveryTag(); + consumer.lastDeliveryTag = msg.getDeliveryTag(); //allow registered listener to handle the message - consumer->listener->received(msg); + consumer.listener->received(msg); //if the handler calls close on the channel or connection while //handling this message, then consumer will now have been deleted. - if(!closed){ + if(isOpen()){ bool multiple(false); - switch(consumer->ackMode){ - case LAZY_ACK: + switch(consumer.ackMode){ + case LAZY_ACK: multiple = true; - if(++(consumer->count) < prefetch) break; + if(++(consumer.count) < prefetch) break; //else drop-through - case AUTO_ACK: - out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple))); - consumer->lastDeliveryTag = 0; + case AUTO_ACK: + send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); + consumer.lastDeliveryTag = 0; } } @@ -399,7 +439,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){ } void Channel::dispatch(){ - while(!closed){ + while(isOpen()){ IncomingMessage* incomingMsg = dequeue(); if(incomingMsg){ //Note: msg is currently only valid for duration of this call @@ -416,12 +456,10 @@ void Channel::dispatch(){ msg.deliveryTag = incomingMsg->getDeliveryTag(); std::string tag = incomingMsg->getConsumerTag(); - if(consumers[tag] == 0){ - //signal error + if(consumers.find(tag) == consumers.end()) std::cout << "Unknown consumer: " << tag << std::endl; - }else{ + else deliver(consumers[tag], msg); - } } delete incomingMsg; } @@ -432,14 +470,60 @@ void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ returnsHandler = handler; } -void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); +// Close called by local application. +void Channel::close( + u_int16_t code, const std::string& text, + ClassId classId, MethodId methodId) +{ + // FIXME aconway 2007-01-26: Locking? + if (getId() != 0 && isOpen()) { + try { + sendAndReceive<ChannelCloseOkBody>( + new ChannelCloseBody(version, code, text, classId, methodId)); + cancelAll(); + closeInternal(); + } catch (...) { + closeInternal(); + throw; + } + } +} + +// Channel closed by peer. +void Channel::peerClose(ChannelCloseBody::shared_ptr) { + assert(isOpen()); + closeInternal(); + // FIXME aconway 2007-01-26: How to throw the proper exception + // to the application thread? } -void Channel::close(){ - if(con != 0){ - con->closeChannel(this); +void Channel::closeInternal() { + assert(isOpen()); + { + Monitor::ScopedLock l(dispatchMonitor); + static_cast<ConnectionForChannel*>(connection)->erase(getId()); + connection = 0; + // A 0 response means we are closed. + responses.signalResponse(AMQMethodBody::shared_ptr()); + dispatchMonitor.notify(); } + dispatcher.join(); } + +void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m) +{ + responses.expect(); + send(toSend); + responses.receive(c, m); +} + +void Channel::sendAndReceiveSync( + bool sync, AMQBody* body, ClassId c, MethodId m) +{ + if(sync) + sendAndReceive(body, c, m); + else + send(body); +} + + diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index e7bab8b4ee..67274ddfc4 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -27,7 +27,6 @@ #include "sys/types.h" #include <framing/amqp_framing.h> -#include <Connection.h> #include <ClientExchange.h> #include <IncomingMessage.h> #include <ClientMessage.h> @@ -35,86 +34,126 @@ #include <ClientQueue.h> #include <ResponseHandler.h> #include <ReturnedMessageHandler.h> +#include "Runnable.h" +#include "ChannelAdapter.h" +#include "Thread.h" namespace qpid { +namespace framing { +class ChannelCloseBody; +} + namespace client { - /** - * The available acknowledgements modes - * - * \ingroup clientapi - */ - enum ack_modes { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 + +class Connection; + +/** + * The available acknowledgements modes + * + * \ingroup clientapi + */ +enum ack_modes { + /** No acknowledgement will be sent, broker can + discard messages as soon as they are delivered + to a consumer using this mode. **/ + NO_ACK = 0, + /** Each message will be automatically + acknowledged as soon as it is delivered to the + application **/ + AUTO_ACK = 1, + /** Acknowledgements will be sent automatically, + but not for each message. **/ + LAZY_ACK = 2, + /** The application is responsible for explicitly + acknowledging messages. **/ + CLIENT_ACK = 3 +}; + +/** + * Represents an AMQP channel, i.e. loosely a session of work. It + * is through a channel that most of the AMQP 'methods' are + * exposed. + * + * \ingroup clientapi + */ +class Channel : public framing::ChannelAdapter, + public sys::Runnable +{ + struct Consumer{ + MessageListener* listener; + int ackMode; + int count; + u_int64_t lastDeliveryTag; }; + typedef std::map<std::string, Consumer> ConsumerMap; + static const std::string OK; + + Connection* connection; + sys::Thread dispatcher; + IncomingMessage* incoming; + ResponseHandler responses; + std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume + IncomingMessage* retrieved;//holds response to basic.get + sys::Monitor dispatchMonitor; + sys::Monitor retrievalMonitor; + ConsumerMap consumers; + ReturnedMessageHandler* returnsHandler; - /** - * Represents an AMQP channel, i.e. loosely a session of work. It - * is through a channel that most of the AMQP 'methods' are - * exposed. - * - * \ingroup clientapi - */ - class Channel : private virtual framing::BodyHandler, - public virtual sys::Runnable - { - struct Consumer{ - MessageListener* listener; - int ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string,Consumer*>::iterator consumer_iterator; + u_int16_t prefetch; + const bool transactional; + framing::ProtocolVersion version; + + void enqueue(); + void retrieve(Message& msg); + IncomingMessage* dequeue(); + void dispatch(); + void deliver(Consumer& consumer, Message& msg); + + void handleHeader(framing::AMQHeaderBody::shared_ptr body); + void handleContent(framing::AMQContentBody::shared_ptr body); + void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); + + void handleMethodInContext( + framing::AMQMethodBody::shared_ptr, + const framing::MethodContext& method); + void setQos(); + void cancelAll(); - u_int16_t id; - Connection* con; - sys::Thread dispatcher; - framing::OutputHandler* out; - IncomingMessage* incoming; - ResponseHandler responses; - std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume - IncomingMessage* retrieved;//holds response to basic.get - sys::Monitor dispatchMonitor; - sys::Monitor retrievalMonitor; - std::map<std::string, Consumer*> consumers; - ReturnedMessageHandler* returnsHandler; - bool closed; + void protocolInit( + const std::string& uid, const std::string& pwd, + const std::string& vhost); + + void sendAndReceive( + framing::AMQBody*, framing::ClassId, framing::MethodId); - u_int16_t prefetch; - const bool transactional; - framing::ProtocolVersion version; + void sendAndReceiveSync( + bool sync, + framing::AMQBody*, framing::ClassId, framing::MethodId); - void enqueue(); - void retrieve(Message& msg); - IncomingMessage* dequeue(); - void dispatch(); - void stop(); - void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body); - void deliver(Consumer* consumer, Message& msg); - void setQos(); - void cancelAll(); + template <class BodyType> + boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) { + sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID); + return boost::shared_polymorphic_downcast<BodyType>( + responses.getResponse()); + } - virtual void handleMethod(framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleRequest(framing::AMQRequestBody::shared_ptr); - void handleResponse(framing::AMQResponseBody::shared_ptr); + template <class BodyType> void sendAndReceiveSync( + bool sync, framing::AMQBody* body) { + sendAndReceiveSync( + sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID); + } - public: - /** + void open(framing::ChannelId, Connection&); + void closeInternal(); + void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); + + friend class Connection; + + public: + + bool isOpen() const; + + /** * Creates a channel object. * * @param transactional if true, the publishing and acknowledgement @@ -124,199 +163,202 @@ namespace client { * @param prefetch specifies the number of unacknowledged * messages the channel is willing to have sent to it * asynchronously - */ + */ Channel(bool transactional = false, u_int16_t prefetch = 500); ~Channel(); - /** - * Declares an exchange. - * - * In AMQP Exchanges are the destinations to which messages - * are published. They have Queues bound to them and route - * messages they receive to those queues. The routing rules - * depend on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareExchange(Exchange& exchange, bool synch = true); - /** - * Deletes an exchange - * - * @param exchange an Exchange object representing the exchange to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteExchange(Exchange& exchange, bool synch = true); - /** - * Declares a Queue - * - * @param queue a Queue object representing the queue to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareQueue(Queue& queue, bool synch = true); - /** - * Deletes a Queue - * - * @param queue a Queue object representing the queue to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - /** - * Binds a queue to an exchange. The exact semantics of this - * (in particular how 'routing keys' and 'binding arguments' - * are used) depends on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to bind to - * - * @param queue a Queue object representing the queue to be - * bound - * - * @param key the 'routing key' for the binding - * - * @param args the 'binding arguments' for the binding - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const framing::FieldTable& args, bool synch = true); - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see ack_modes - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); + /** + * Declares an exchange. + * + * In AMQP Exchanges are the destinations to which messages + * are published. They have Queues bound to them and route + * messages they receive to those queues. The routing rules + * depend on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareExchange(Exchange& exchange, bool synch = true); + /** + * Deletes an exchange + * + * @param exchange an Exchange object representing the exchange to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteExchange(Exchange& exchange, bool synch = true); + /** + * Declares a Queue + * + * @param queue a Queue object representing the queue to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareQueue(Queue& queue, bool synch = true); + /** + * Deletes a Queue + * + * @param queue a Queue object representing the queue to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); + /** + * Binds a queue to an exchange. The exact semantics of this + * (in particular how 'routing keys' and 'binding arguments' + * are used) depends on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to bind to + * + * @param queue a Queue object representing the queue to be + * bound + * + * @param key the 'routing key' for the binding + * + * @param args the 'binding arguments' for the binding + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void bind(const Exchange& exchange, const Queue& queue, const std::string& key, + const framing::FieldTable& args, bool synch = true); + /** + * Creates a 'consumer' for a queue. Messages in (or arriving + * at) that queue will be delivered to consumers + * asynchronously. + * + * @param queue a Queue instance representing the queue to + * consume from + * + * @param tag an identifier to associate with the consumer + * that can be used to cancel its subscription (if empty, this + * will be assigned by the broker) + * + * @param listener a pointer to an instance of an + * implementation of the MessageListener interface. Messages + * received from this queue for this consumer will result in + * invocation of the received() method on the listener, with + * the message itself passed in. + * + * @param ackMode the mode of acknowledgement that the broker + * should assume for this consumer. @see ack_modes + * + * @param noLocal if true, this consumer will not be sent any + * message published by this connection + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void consume( + Queue& queue, std::string& tag, MessageListener* listener, + int ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * ack_modes) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory = false, bool immediate = false); + /** + * Cancels a subscription previously set up through a call to consume(). + * + * @param tag the identifier used (or assigned) in the consume + * request that set up the subscription to be cancelled. + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void cancel(const std::string& tag, bool synch = true); + /** + * Synchronous pull of a message from a queue. + * + * @param msg a message object that will contain the message + * headers and content if the call completes. + * + * @param queue the queue to consume from + * + * @param ackMode the acknowledgement mode to use (@see + * ack_modes) + * + * @return true if a message was succcessfully dequeued from + * the queue, false if the queue was empty. + */ + bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); + /** + * Publishes (i.e. sends a message to the broker). + * + * @param msg the message to publish + * + * @param exchange the exchange to publish the message to + * + * @param routingKey the routing key to publish with + * + * @param mandatory if true and the exchange to which this + * publish is directed has no matching bindings, the message + * will be returned (see setReturnedMessageHandler()). + * + * @param immediate if true and there is no consumer to + * receive this message on publication, the message will be + * returned (see setReturnedMessageHandler()). + */ + void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, + bool mandatory = false, bool immediate = false); - /** - * For a transactional channel this will commit all - * publications and acknowledgements since the last commit (or - * the channel was opened if there has been no previous - * commit). This will cause published messages to become - * available to consumers and acknowledged messages to be - * consumed and removed from the queues they were dispatched - * from. - * - * Transactionailty of a channel is specified when the channel - * object is created (@see Channel()). - */ - void commit(); - /** - * For a transactional channel, this will rollback any - * publications or acknowledgements. It will be as if the - * ppblished messages were never sent and the acknowledged - * messages were never consumed. - */ - void rollback(); + /** + * For a transactional channel this will commit all + * publications and acknowledgements since the last commit (or + * the channel was opened if there has been no previous + * commit). This will cause published messages to become + * available to consumers and acknowledged messages to be + * consumed and removed from the queues they were dispatched + * from. + * + * Transactionailty of a channel is specified when the channel + * object is created (@see Channel()). + */ + void commit(); + /** + * For a transactional channel, this will rollback any + * publications or acknowledgements. It will be as if the + * ppblished messages were never sent and the acknowledged + * messages were never consumed. + */ + void rollback(); - /** - * Change the prefetch in use. - */ - void setPrefetch(u_int16_t prefetch); + /** + * Change the prefetch in use. + */ + void setPrefetch(u_int16_t prefetch); - /** - * Start message dispatching on a new thread - */ - void start(); - /** - * Do message dispatching on this thread - */ - void run(); + /** + * Start message dispatching on a new thread + */ + void start(); - /** - * Closes a channel, stopping any message dispatching. - */ - void close(); + // TODO aconway 2007-01-26: Can it be private? + /** + * Dispatch messages on this channel in the calling thread. + */ + void run(); - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); + /** + * Close the channel with optional error information. + * Closing a channel that is not open has no effect. + */ + void close( + framing::ReplyCode = 200, const std::string& =OK, + framing::ClassId = 0, framing::MethodId = 0); - friend class Connection; - }; + /** + * Set a handler for this channel that will process any + * returned messages + * + * @see publish() + */ + void setReturnedMessageHandler(ReturnedMessageHandler* handler); +}; } } diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 1ae317db62..19d5cce7db 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -18,35 +18,46 @@ * under the License. * */ +#include <boost/format.hpp> + #include <Connection.h> #include <ClientChannel.h> #include <ClientMessage.h> #include <QpidError.h> #include <iostream> +#include <sstream> #include <MethodBodyInstances.h> -using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::sys; -u_int16_t Connection::channelIdCounter; + +namespace qpid { +namespace client { + +ChannelId Connection::channelIdCounter; + +const std::string Connection::OK("OK"); Connection::Connection( bool debug, u_int32_t _max_frame_size, - qpid::framing::ProtocolVersion* _version + const framing::ProtocolVersion& _version ) : max_frame_size(_max_frame_size), closed(true), - version(_version->getMajor(),_version->getMinor()) + version(_version) { - connector = new Connector( - version, requester, responder, debug, _max_frame_size); + connector = new Connector(version, debug, _max_frame_size); } Connection::~Connection(){ delete connector; } -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ +void Connection::open( + const std::string& _host, int _port, const std::string& uid, + const std::string& pwd, const std::string& virtualhost) +{ + host = _host; port = _port; connector->setInputHandler(this); @@ -55,197 +66,69 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui out = connector->getOutputHandler(); connector->connect(host, port); - ProtocolInitiation* header = new ProtocolInitiation(version); - responses.expect(); - connector->init(header); - responses.receive(method_bodies.connection_start); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - responses.receive(method_bodies.connection_tune); - - ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - - u_int16_t heartbeat = proposal->getHeartbeat(); - connector->setReadTimeout(heartbeat * 2); - connector->setWriteTimeout(heartbeat); - - //send connection.open - string capabilities; - string vhost = virtualhost; - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). - responses.waitForResponse(); - if(responses.validate(method_bodies.connection_open_ok)){ - //ok - }else if(responses.validate(method_bodies.connection_redirect)){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() << std::endl; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } - + // Open the special channel 0. + channels[0] = &channel0; + channel0.open(0, *this); + channel0.protocolInit(uid, pwd, virtualhost); } -void Connection::close(){ - if(!closed){ - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); +void Connection::close( + ReplyCode code, const string& msg, ClassId classId, MethodId methodId +) +{ + if(!closed) { + channel0.sendAndReceive<ConnectionCloseOkBody>( + new ConnectionCloseBody( + getVersion(), code, msg, classId, methodId)); connector->close(); } } -void Connection::openChannel(Channel* channel){ - channel->con = this; - channel->id = ++channelIdCounter; - channel->out = out; - channels[channel->id] = channel; - //now send frame to open channel and wait for response - string oob; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); - channel->setQos(); - channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ - //send frame to close channel - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - closeChannel(channel, code, text, classId, methodId); +// FIXME aconway 2007-01-26: make channels owned and created by connection? +void Connection::openChannel(Channel& channel) { + ChannelId id = ++channelIdCounter; + assert (channels.find(id) == channels.end()); + assert(out); + channels[id] = &channel; + channel.open(id, *this); } -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ - //send frame to close channel - channel->cancelAll(); - channel->closed = true; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); - channel->con = 0; - channel->out = 0; - removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ - //send frame to close channel - - channels.erase(channel->id); - channel->out = 0; - channel->id = 0; - channel->con = 0; +void Connection::erase(ChannelId id) { + channels.erase(id); } void Connection::received(AMQFrame* frame){ - AMQBody::shared_ptr body = frame->getBody(); - u_int8_t type = body->type(); - if (type == REQUEST_BODY) - responder.received(AMQRequestBody::getData(body)); - handleFrame(frame); - if (type == RESPONSE_BODY) - requester.processed(AMQResponseBody::getData(body)); -} - -void Connection::handleFrame(AMQFrame* frame){ - u_int16_t channelId = frame->getChannel(); - - if(channelId == 0){ - this->handleBody(frame->getBody()); - }else{ - Channel* channel = channels[channelId]; - if(channel == 0){ - error(504, "Unknown channel"); - }else{ - try{ - channel->handleBody(frame->getBody()); - }catch(qpid::QpidError e){ - channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } - } + // FIXME aconway 2007-01-25: Mutex + ChannelId id = frame->getChannel(); + Channel* channel = channels[id]; + // FIXME aconway 2007-01-26: Exception thrown here is hanging the + // client. Need to review use of exceptions. + if (channel == 0) + THROW_QPID_ERROR( + PROTOCOL_ERROR+504, + (boost::format("Invalid channel number %g") % id).str()); + try{ + channel->handleBody(frame->getBody()); + }catch(const qpid::QpidError& e){ + channelException( + *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); } } -void Connection::handleRequest(AMQRequestBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); -} - -void Connection::handleResponse(AMQResponseBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ - //connection.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(method_bodies.connection_close.match(body.get())){ - //send back close ok - //close socket - ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); - std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; - connector->close(); - }else{ - std::cout << "Unhandled method for connection: " << *body << std::endl; - error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); - } -} - -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ - error(504, "Channel error: received header body with channel 0."); -} - -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ - error(504, "Channel error: received content body with channel 0."); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); +void Connection::send(AMQFrame* frame) { out->send(frame); - responses.receive(body); } -void Connection::error(int code, const string& msg, int classid, int methodid){ - std::cout << "Connection exception generated: " << code << msg; - if(classid || methodid){ - std::cout << " [" << methodid << ":" << classid << "]"; - } - std::cout << std::endl; - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); - connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; - int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; +void Connection::channelException( + Channel& channel, AMQMethodBody* method, const QpidError& e) +{ + int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; string msg = e.msg; - if(method == 0){ - closeChannel(channel, code, msg); - }else{ - closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); - } + if(method == 0) + channel.close(code, msg); + else + channel.close( + code, msg, method->amqpClassId(), method->amqpMethodId()); } void Connection::idleIn(){ @@ -259,9 +142,12 @@ void Connection::idleOut(){ void Connection::shutdown(){ closed = true; - //close all channels - for(iterator i = channels.begin(); i != channels.end(); i++){ - i->second->stop(); + //close all channels, also removes them from the map. + while(!channels.empty()){ + Channel* channel = channels.begin()->second; + if (channel != 0) + channel->close(); } - responses.signalResponse(AMQMethodBody::shared_ptr()); } + +}} // namespace qpid::client diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 9c9b067f88..6ee9e62e47 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -1,3 +1,6 @@ +#ifndef _Connection_ +#define _Connection_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,15 +23,15 @@ */ #include <map> #include <string> +#include <boost/shared_ptr.hpp> -#ifndef _Connection_ -#define _Connection_ - +#include "amqp_types.h" #include <QpidError.h> #include <Connector.h> #include <sys/ShutdownHandler.h> #include <sys/TimeoutHandler.h> +#include "framing/amqp_types.h" #include <framing/amqp_framing.h> #include <ClientExchange.h> #include <IncomingMessage.h> @@ -37,150 +40,152 @@ #include <ClientQueue.h> #include <ResponseHandler.h> #include <AMQP_HighestVersion.h> -#include "Requester.h" -#include "Responder.h" +#include "ClientChannel.h" namespace qpid { +/** + * The client namespace contains all classes that make up a client + * implementation of the AMQP protocol. The key classes that form + * the basis of the client API to be used by applications are + * Connection and Channel. + */ +namespace client { + +class Channel; + +/** + * \internal provide access to selected private channel functions + * for the Connection without making it a friend of the entire channel. + */ +class ConnectionForChannel : + public framing::InputHandler, + public framing::OutputHandler, + public sys::TimeoutHandler, + public sys::ShutdownHandler + +{ + private: + friend class Channel; + virtual void erase(framing::ChannelId) = 0; +}; + + +/** + * \defgroup clientapi Application API for an AMQP client + */ + +/** + * Represents a connection to an AMQP broker. All communication is + * initiated by establishing a connection, then opening one or + * more Channels over that connection. + * + * \ingroup clientapi + */ +class Connection : public ConnectionForChannel +{ + typedef std::map<framing::ChannelId, Channel*> ChannelMap; + + static framing::ChannelId channelIdCounter; + static const std::string OK; + + std::string host; + int port; + const u_int32_t max_frame_size; + ChannelMap channels; + Connector* connector; + framing::OutputHandler* out; + volatile bool closed; + framing::ProtocolVersion version; + + void erase(framing::ChannelId); + void channelException( + Channel&, framing::AMQMethodBody*, const QpidError&); + Channel channel0; + + // TODO aconway 2007-01-26: too many friendships, untagle these classes. + friend class Channel; + + public: + const framing::ProtocolVersion& getVersion() const { return version; } + + /** + * Creates a connection object, but does not open the + * connection. + * + * @param _version the version of the protocol to connect with + * + * @param debug turns on tracing for the connection + * (i.e. prints details of the frames sent and received to std + * out). Optional and defaults to false. + * + * @param max_frame_size the maximum frame size that the + * client will accept. Optional and defaults to 65536. + */ + Connection( + bool debug = false, u_int32_t max_frame_size = 65536, + const framing::ProtocolVersion& = framing::highestProtocolVersion); + ~Connection(); + /** - * The client namespace contains all classes that make up a client - * implementation of the AMQP protocol. The key classes that form - * the basis of the client API to be used by applications are - * Connection and Channel. + * Opens a connection to a broker. + * + * @param host the host on which the broker is running + * + * @param port the port on the which the broker is listening + * + * @param uid the userid to connect with + * + * @param pwd the password to connect with (currently SASL + * PLAIN is the only authentication method supported so this + * is sent in clear text) + * + * @param virtualhost the AMQP virtual host to use (virtual + * hosts, where implemented(!), provide namespace partitioning + * within a single broker). */ -namespace client { + void open(const std::string& host, int port = 5672, + const std::string& uid = "guest", const std::string& pwd = "guest", + const std::string& virtualhost = "/"); - class Channel; /** - * \defgroup clientapi Application API for an AMQP client + * Close the connection with optional error information for the peer. + * + * Any further use of this connection (without reopening it) will + * not succeed. */ + void close(framing::ReplyCode=200, const std::string& msg=OK, + framing::ClassId = 0, framing::MethodId = 0); /** - * Represents a connection to an AMQP broker. All communication is - * initiated by establishing a connection, then opening one or - * more Channels over that connection. + * Associate a Channel with this connection and open it for use. + * + * In AMQP channels are like multi-plexed 'sessions' of work over + * a connection. Almost all the interaction with AMQP is done over + * a channel. * - * \ingroup clientapi + * @param connection the connection object to be associated with + * the channel. Call Channel::close() to close the channel. + */ + void openChannel(Channel&); + + + // TODO aconway 2007-01-26: can these be private? + void send(framing::AMQFrame*); + void received(framing::AMQFrame*); + void idleOut(); + void idleIn(); + void shutdown(); + + /** + * @return the maximum frame size in use on this connection */ - class Connection : public virtual qpid::framing::InputHandler, - public virtual qpid::sys::TimeoutHandler, - public virtual qpid::sys::ShutdownHandler, - private virtual qpid::framing::BodyHandler - { - - typedef std::map<int, Channel*>::iterator iterator; - - static u_int16_t channelIdCounter; - - std::string host; - int port; - const u_int32_t max_frame_size; - std::map<int, Channel*> channels; - Connector* connector; - qpid::framing::OutputHandler* out; - ResponseHandler responses; - volatile bool closed; - framing::ProtocolVersion version; - framing::Requester requester; - framing::Responder responder; - - void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e); - void error(int code, const std::string& msg, int classid = 0, int methodid = 0); - void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0); - void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body); - - // FIXME aconway 2007-01-19: Use channel(0) not connection - // to handle channel 0 requests. Remove handler methods. - // - void handleRequest(framing::AMQRequestBody::shared_ptr); - void handleResponse(framing::AMQResponseBody::shared_ptr); - void handleMethod(framing::AMQMethodBody::shared_ptr); - void handleHeader(framing::AMQHeaderBody::shared_ptr); - void handleContent(framing::AMQContentBody::shared_ptr); - void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr); - void handleFrame(framing::AMQFrame* frame); - - public: - /** - * Creates a connection object, but does not open the - * connection. - * - * @param _version the version of the protocol to connect with - * - * @param debug turns on tracing for the connection - * (i.e. prints details of the frames sent and received to std - * out). Optional and defaults to false. - * - * @param max_frame_size the maximum frame size that the - * client will accept. Optional and defaults to 65536. - */ - Connection( bool debug = false, u_int32_t max_frame_size = 65536, - framing::ProtocolVersion* _version = &(framing::highestProtocolVersion)); - ~Connection(); - - /** - * Opens a connection to a broker. - * - * @param host the host on which the broker is running - * - * @param port the port on the which the broker is listening - * - * @param uid the userid to connect with - * - * @param pwd the password to connect with (currently SASL - * PLAIN is the only authentication method supported so this - * is sent in clear text) - * - * @param virtualhost the AMQP virtual host to use (virtual - * hosts, where implemented(!), provide namespace partitioning - * within a single broker). - */ - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", const std::string& pwd = "guest", - const std::string& virtualhost = "/"); - /** - * Closes the connection. Any further use of this connection - * (without reopening it) will not succeed. - */ - void close(); - /** - * Opens a Channel. In AMQP channels are like multi-plexed - * 'sessions' of work over a connection. Almost all the - * interaction with AMQP is done over a channel. - * - * @param channel a pointer to a channel instance that will be - * used to represent the new channel. - */ - void openChannel(Channel* channel); - /* - * Requests that the server close this channel, then removes - * the association to the channel from this connection - * - * @param channel a pointer to the channel instance to close - */ - void closeChannel(Channel* channel); - /* - * Removes the channel from association with this connection, - * without sending a close request to the server. - * - * @param channel a pointer to the channel instance to - * disassociate - */ - void removeChannel(Channel* channel); - - virtual void received(framing::AMQFrame* frame); - - virtual void idleOut(); - virtual void idleIn(); - - virtual void shutdown(); - - /** - * @return the maximum frame size in use on this connection - */ - inline u_int32_t getMaxFrameSize(){ return max_frame_size; } - }; + inline u_int32_t getMaxFrameSize(){ return max_frame_size; } + + /** @return protocol version in use on this connection. */ + const framing::ProtocolVersion& getVersion() { return version; } +}; } } diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index d05540ba32..425cecaf6f 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -22,8 +22,6 @@ #include <QpidError.h> #include <sys/Time.h> #include "Connector.h" -#include "Requester.h" -#include "Responder.h" using namespace qpid::sys; using namespace qpid::client; @@ -31,7 +29,6 @@ using namespace qpid::framing; using qpid::QpidError; Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, - Requester& req, Responder& resp, bool _debug, u_int32_t buffer_size) : debug(_debug), receive_buffer_size(buffer_size), @@ -44,9 +41,7 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, timeoutHandler(0), shutdownHandler(0), inbuf(receive_buffer_size), - outbuf(send_buffer_size), - requester(req), - responder(resp) + outbuf(send_buffer_size) { } Connector::~Connector(){ } @@ -58,9 +53,9 @@ void Connector::connect(const std::string& host, int port){ receiver = Thread(this); } -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; +void Connector::init(){ + ProtocolInitiation init(version); + writeBlock(&init); } void Connector::close(){ @@ -81,16 +76,11 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -void Connector::send(AMQFrame* frame){ +void Connector::send(AMQFrame* f){ + std::auto_ptr<AMQFrame> frame(f); AMQBody::shared_ptr body = frame->getBody(); - u_int8_t type = body->type(); - if (type == REQUEST_BODY) - requester.sending(AMQRequestBody::getData(body)); - else if (type == RESPONSE_BODY) - responder.sending(AMQResponseBody::getData(body)); - writeBlock(frame); + writeBlock(frame.get()); if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; } void Connector::writeBlock(AMQDataBlock* data){ @@ -185,10 +175,8 @@ void Connector::run(){ inbuf.compact(); } } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg - << " (" << error.location.file << ":" << error.location.line - << ")" << std::endl; + } catch (const std::exception& e) { + std::cout << e.what() << std::endl; handleClosed(); } } diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 02926b2bdb..40663486f2 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -35,13 +35,6 @@ namespace qpid { -namespace framing { - -class Requester; -class Responder; - -} // namespace framing - namespace client { class Connector : public qpid::framing::OutputHandler, @@ -74,9 +67,6 @@ class Connector : public qpid::framing::OutputHandler, qpid::sys::Socket socket; - qpid::framing::Requester& requester; - qpid::framing::Responder& responder; - void checkIdle(ssize_t status); void writeBlock(qpid::framing::AMQDataBlock* data); void writeToSocket(char* data, size_t available); @@ -85,13 +75,13 @@ class Connector : public qpid::framing::OutputHandler, void run(); void handleClosed(); + friend class Channel; public: Connector(const qpid::framing::ProtocolVersion& pVersion, - qpid::framing::Requester& req, qpid::framing::Responder& resp, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void init(); virtual void close(); virtual void setInputHandler(qpid::framing::InputHandler* handler); virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index 68c7e52013..8950138f5e 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -18,27 +18,35 @@ * under the License. * */ +#include <boost/format.hpp> + #include <ResponseHandler.h> #include <sys/Monitor.h> #include <QpidError.h> +#include "amqp_types.h" using namespace qpid::sys; +using namespace qpid::framing; + +namespace qpid { +namespace client { -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} +ResponseHandler::ResponseHandler() : waiting(false){} -qpid::client::ResponseHandler::~ResponseHandler(){} +ResponseHandler::~ResponseHandler(){} -bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ - return response != 0 && expected.match(response.get()); +bool ResponseHandler::validate(ClassId c, MethodId m) { + return response != 0 && + response->amqpClassId() ==c && response->amqpMethodId() == m; } -void qpid::client::ResponseHandler::waitForResponse(){ +void ResponseHandler::waitForResponse(){ Monitor::ScopedLock l(monitor); while (waiting) monitor.wait(); } -void qpid::client::ResponseHandler::signalResponse( +void ResponseHandler::signalResponse( qpid::framing::AMQMethodBody::shared_ptr _response) { Monitor::ScopedLock l(monitor); @@ -47,15 +55,26 @@ void qpid::client::ResponseHandler::signalResponse( monitor.notify(); } -void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ +void ResponseHandler::receive(ClassId c, MethodId m) { Monitor::ScopedLock l(monitor); while (waiting) monitor.wait(); - if(!validate(expected)){ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); + if (!response) { + THROW_QPID_ERROR( + PROTOCOL_ERROR, "Channel closed unexpectedly."); + } + if(!validate(response->amqpClassId(), response->amqpMethodId())) { + THROW_QPID_ERROR( + PROTOCOL_ERROR, + (boost::format( + "Expected class:method %d:%d, got %d:%d") + % c % m % response->amqpClassId() % response->amqpMethodId() + ).str()); } } -void qpid::client::ResponseHandler::expect(){ +void ResponseHandler::expect(){ waiting = true; } + +}} // namespace qpid::client diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h index c3d499d046..c402bcbc65 100644 --- a/cpp/lib/client/ResponseHandler.h +++ b/cpp/lib/client/ResponseHandler.h @@ -19,6 +19,8 @@ * */ #include <string> + +#include <amqp_types.h> #include <framing/amqp_framing.h> #include <sys/Monitor.h> @@ -26,26 +28,39 @@ #define _ResponseHandler_ namespace qpid { - namespace client { - - class ResponseHandler{ - bool waiting; - qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor monitor; - - public: - ResponseHandler(); - ~ResponseHandler(); - inline bool isWaiting(){ return waiting; } - inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; } - bool validate(const qpid::framing::AMQMethodBody& expected); - void waitForResponse(); - void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response); - void receive(const qpid::framing::AMQMethodBody& expected); - void expect();//must be called before calling receive - }; +namespace client { + +/** + * Holds a response from the broker peer for the client. + */ +class ResponseHandler{ + bool waiting; + qpid::framing::AMQMethodBody::shared_ptr response; + qpid::sys::Monitor monitor; + + public: + ResponseHandler(); + ~ResponseHandler(); + + bool isWaiting(){ return waiting; } + framing::AMQMethodBody::shared_ptr getResponse(){ return response;} + void waitForResponse(); + + void signalResponse(framing::AMQMethodBody::shared_ptr response); + + void expect();//must be called before calling receive + bool validate(framing::ClassId, framing::MethodId); + void receive(framing::ClassId, framing::MethodId); + template <class BodyType> bool validate() { + return validate(BodyType::CLASS_ID, BodyType::METHOD_ID); } + template <class BodyType> void receive() { + return receive(BodyType::CLASS_ID, BodyType::METHOD_ID); + } +}; + +} } |
