summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
committerAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
commit5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch)
treef9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client
parent5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff)
downloadqpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side. * Improved various exception messages with boost::format messages. * Removed unnecssary virtual inheritance. * Widespread: fixed incorrect non-const ProtocolVersion& parameters. * Client API: pass channels by reference, not pointer. * codegen: - MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template. - Various: fixed non-const ProtocolVersion& parameters. * cpp/bootstrap: Allow config arguments with -build. * cpp/gen/Makefile.am: Merged codegen fixes from trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/ClientAdapter.cpp70
-rw-r--r--cpp/lib/client/ClientAdapter.h66
-rw-r--r--cpp/lib/client/ClientChannel.cpp408
-rw-r--r--cpp/lib/client/ClientChannel.h550
-rw-r--r--cpp/lib/client/Connection.cpp254
-rw-r--r--cpp/lib/client/Connection.h273
-rw-r--r--cpp/lib/client/Connector.cpp30
-rw-r--r--cpp/lib/client/Connector.h14
-rw-r--r--cpp/lib/client/ResponseHandler.cpp39
-rw-r--r--cpp/lib/client/ResponseHandler.h51
10 files changed, 960 insertions, 795 deletions
diff --git a/cpp/lib/client/ClientAdapter.cpp b/cpp/lib/client/ClientAdapter.cpp
new file mode 100644
index 0000000000..c77f049c96
--- /dev/null
+++ b/cpp/lib/client/ClientAdapter.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AMQP_ClientOperations.h"
+#include "ClientAdapter.h"
+#include "Connection.h"
+#include "Exception.h"
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace client {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+void ClientAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ method->invoke(*clientOps, context);
+ }catch(ChannelException& e){
+ connection.client->getChannel().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.client->getConnection().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.client->getConnection().close(
+ context, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
+ channel->handleHeader(body);
+}
+
+void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) {
+ channel->handleContent(body);
+}
+
+void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+ // TODO aconway 2007-01-17: Implement heartbeats.
+}
+
+
+
+}} // namespace qpid::client
+
diff --git a/cpp/lib/client/ClientAdapter.h b/cpp/lib/client/ClientAdapter.h
new file mode 100644
index 0000000000..d5e16fc6ad
--- /dev/null
+++ b/cpp/lib/client/ClientAdapter.h
@@ -0,0 +1,66 @@
+#ifndef _client_ClientAdapter_h
+#define _client_ClientAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ChannelAdapter.h"
+#include "ClientChannel.h"
+
+namespace qpid {
+namespace client {
+
+class AMQMethodBody;
+class Connection;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * Translates protocol bodies into calls on the core Channel,
+ * Connection and Client objects.
+ *
+ * Owns a channel, has references to Connection and Client.
+ */
+class ClientAdapter : public framing::ChannelAdapter
+{
+ public:
+ ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&);
+ Channel& getChannel() { return *channel; }
+
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+
+ private:
+ void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
+ class ClientOps;
+
+ std::auto_ptr<Channel> channel;
+ Connection& connection;
+ Client& client;
+ boost::shared_ptr<ClientOps> clientOps;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_ClientAdapter_h*/
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index d9edb2f390..b93596ebfc 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -23,42 +23,115 @@
#include <ClientMessage.h>
#include <QpidError.h>
#include <MethodBodyInstances.h>
+#include "Connection.h"
+
+// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
+// handling of errors that should close the connection or the channel.
+// Make sure the user thread receives a connection in each case.
+//
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+const std::string Channel::OK("OK");
+
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- id(0),
- con(0),
- out(0),
+ connection(0),
incoming(0),
- closed(true),
prefetch(_prefetch),
- transactional(_transactional),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- version(8, 0)
+ transactional(_transactional)
{ }
Channel::~Channel(){
- stop();
+ close();
+}
+
+void Channel::open(ChannelId id, Connection& con)
+{
+ if (isOpen())
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
+ connection = &con;
+ init(id, con, con.getVersion()); // ChannelAdapter initialization.
+ string oob;
+ if (id != 0)
+ sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+}
+
+void Channel::protocolInit(
+ const std::string& uid, const std::string& pwd, const std::string& vhost) {
+ assert(connection);
+ responses.expect();
+ connection->connector->init(); // Send ProtocolInit block.
+ responses.receive<ConnectionStartBody>();
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ // TODO aconway 2007-01-26: Move client over to proxy model,
+ // symmetric with server.
+ ConnectionTuneBody::shared_ptr proposal =
+ sendAndReceive<ConnectionTuneBody>(
+ new ConnectionStartOkBody(
+ version, props, mechanism, response, locale));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ connection->send(
+ new AMQFrame(
+ version, 0,
+ new ConnectionTuneOkBody(
+ version, proposal->getChannelMax(),
+ connection->getMaxFrameSize(),
+ proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connection->connector->setReadTimeout(heartbeat * 2);
+ connection->connector->setWriteTimeout(heartbeat);
+
+ // Send connection open.
+ std::string capabilities;
+ responses.expect();
+ send(new AMQFrame(
+ version, 0,
+ new ConnectionOpenBody(version, vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now
+ //esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate<ConnectionOpenOkBody>()) {
+ //ok
+ }else if(responses.validate<ConnectionRedirectBody>()){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(
+ shared_polymorphic_downcast<ConnectionRedirectBody>(
+ responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost()
+ << std::endl;
+ } else {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
}
+
+bool Channel::isOpen() const { return connection; }
void Channel::setPrefetch(u_int16_t _prefetch){
prefetch = _prefetch;
- if(con != 0 && out != 0){
- setQos();
- }
+ setQos();
}
void Channel::setQos(){
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(version, 0, prefetch, false));
if(transactional){
- sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
}
}
@@ -66,62 +139,51 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_declare_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<ExchangeDeclareOkBody>(
+ synch,
+ new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_delete_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<ExchangeDeleteOkBody>(
+ synch,
+ new ExchangeDeleteBody(version, 0, name, false, !synch));
}
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false,
- queue.isExclusive(),
- queue.isAutoDelete(), !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_declare_ok);
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false, false,
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if (synch) {
if(queue.getName().length() == 0){
QueueDeclareOkBody::shared_ptr response =
- dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ shared_polymorphic_downcast<QueueDeclareOkBody>(
+ responses.getResponse());
queue.setName(response->getQueue());
}
- }else{
- out->send(frame);
}
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_delete_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<QueueDeleteOkBody>(
+ synch,
+ new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_bind_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<QueueBindOkBody>(
+ synch,
+ new QueueBindBody(version, 0, q, e, key,!synch, args));
}
void Channel::consume(
@@ -129,52 +191,48 @@ void Channel::consume(
int ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
string q = queue.getName();
- AMQFrame* frame =
- new AMQFrame(version,
- id,
- new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_consume_ok);
- BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if (synch) {
+ BasicConsumeOkBody::shared_ptr response =
+ shared_polymorphic_downcast<BasicConsumeOkBody>(
+ responses.getResponse());
tag = response->getConsumerTag();
- }else{
- out->send(frame);
- }
- Consumer* c = new Consumer();
- c->listener = listener;
- c->ackMode = ackMode;
- c->lastDeliveryTag = 0;
- consumers[tag] = c;
-}
-
-void Channel::cancel(std::string& tag, bool synch){
- Consumer* c = consumers[tag];
- if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
-
- AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_cancel_ok);
- }else{
- out->send(frame);
- }
- consumers.erase(tag);
- if(c != 0){
- delete c;
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+}
+
+void Channel::cancel(const std::string& tag, bool synch) {
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end()) {
+ Consumer& c = i->second;
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(version, tag, !synch));
+ consumers.erase(tag);
}
}
void Channel::cancelAll(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
- Consumer* c = i->second;
- if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ while(!consumers.empty()) {
+ Consumer c = consumers.begin()->second;
+ consumers.erase(consumers.begin());
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ // Let exceptions propagate, if one fails no point
+ // trying the rest. NB no memory leaks if we do,
+ // ConsumerMap holds values, not pointers.
+ //
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
- consumers.erase(i);
- delete c;
}
}
@@ -191,26 +249,28 @@ void Channel::retrieve(Message& msg){
retrieved = 0;
}
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
+ AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode));
responses.expect();
- out->send(frame);
+ send(body);
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
- if(method_bodies.basic_get_ok.match(response.get())){
+ if(response->isA<BasicGetOkBody>()) {
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
+ // FIXME aconway 2007-01-26: close the connection? the channel?
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
}
retrieve(msg);
return true;
- }if(method_bodies.basic_get_empty.match(response.get())){
+ }if(response->isA<BasicGetEmptyBody>()){
return false;
}else{
- THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ // FIXME aconway 2007-01-26: must close the connection.
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
}
}
@@ -219,25 +279,24 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
- AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(version, id, body));
+ send(msg.header);
u_int64_t data_length = data.length();
if(data_length > 0){
- u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
if(data_length < frag_size){
- out->send(new AMQFrame(version, id, new AMQContentBody(data)));
+ send(new AMQContentBody(data));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
+ send(new AMQContentBody(frag));
offset += length;
remaining = data_length - offset;
@@ -247,56 +306,48 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
- sendAndReceive(frame, method_bodies.tx_commit_ok);
+ sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
- sendAndReceive(frame, method_bodies.tx_rollback_ok);
-}
-
-void Channel::handleRequest(AMQRequestBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
+ sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
}
-void Channel::handleResponse(AMQResponseBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
- //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+void Channel::handleMethodInContext(
+ AMQMethodBody::shared_ptr body, const MethodContext&)
+{
+ //channel.flow, channel.close, basic.deliver, basic.return or a
+ //response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(method_bodies.basic_deliver.match(body.get())){
+ }else if(body->isA<BasicDeliverBody>()) {
if(incoming != 0){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
}
- }else if(method_bodies.basic_return.match(body.get())){
+ }else if(body->isA<BasicReturnBody>()){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
}
- }else if(method_bodies.channel_close.match(body.get())){
- con->removeChannel(this);
- //need to signal application that channel has been closed through exception
-
- }else if(method_bodies.channel_flow.match(body.get())){
-
+ }else if(body->isA<ChannelCloseBody>()){
+ peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+ }else if(body->isA<ChannelFlowBody>()){
+ // TODO aconway 2007-01-24:
+ }else if(body->isA<ConnectionCloseBody>()){
+ connection->close();
}else{
- //signal error
- std::cout << "Unhandled method: " << *body << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ connection->close(
+ 504, "Unrecognised method",
+ body->amqpClassId(), body->amqpMethodId());
}
}
-
+
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
if(incoming == 0){
//handle invalid frame sequence
@@ -331,27 +382,16 @@ void Channel::start(){
dispatcher = Thread(this);
}
-void Channel::stop(){
- {
- Monitor::ScopedLock l(dispatchMonitor);
- closed = true;
- responses.signalResponse(AMQMethodBody::shared_ptr());
- dispatchMonitor.notify();
- }
- dispatcher.join();
-}
-
void Channel::run(){
dispatch();
}
void Channel::enqueue(){
+ Monitor::ScopedLock l(retrievalMonitor);
if(incoming->isResponse()){
- Monitor::ScopedLock l(retrievalMonitor);
retrieved = incoming;
retrievalMonitor.notify();
}else{
- Monitor::ScopedLock l(dispatchMonitor);
messages.push(incoming);
dispatchMonitor.notify();
}
@@ -360,7 +400,7 @@ void Channel::enqueue(){
IncomingMessage* Channel::dequeue(){
Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && !closed){
+ while(messages.empty() && isOpen()){
dispatchMonitor.wait();
}
IncomingMessage* msg = 0;
@@ -371,25 +411,25 @@ IncomingMessage* Channel::dequeue(){
return msg;
}
-void Channel::deliver(Consumer* consumer, Message& msg){
+void Channel::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
- consumer->lastDeliveryTag = msg.getDeliveryTag();
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
//allow registered listener to handle the message
- consumer->listener->received(msg);
+ consumer.listener->received(msg);
//if the handler calls close on the channel or connection while
//handling this message, then consumer will now have been deleted.
- if(!closed){
+ if(isOpen()){
bool multiple(false);
- switch(consumer->ackMode){
- case LAZY_ACK:
+ switch(consumer.ackMode){
+ case LAZY_ACK:
multiple = true;
- if(++(consumer->count) < prefetch) break;
+ if(++(consumer.count) < prefetch) break;
//else drop-through
- case AUTO_ACK:
- out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
+ case AUTO_ACK:
+ send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
+ consumer.lastDeliveryTag = 0;
}
}
@@ -399,7 +439,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){
}
void Channel::dispatch(){
- while(!closed){
+ while(isOpen()){
IncomingMessage* incomingMsg = dequeue();
if(incomingMsg){
//Note: msg is currently only valid for duration of this call
@@ -416,12 +456,10 @@ void Channel::dispatch(){
msg.deliveryTag = incomingMsg->getDeliveryTag();
std::string tag = incomingMsg->getConsumerTag();
- if(consumers[tag] == 0){
- //signal error
+ if(consumers.find(tag) == consumers.end())
std::cout << "Unknown consumer: " << tag << std::endl;
- }else{
+ else
deliver(consumers[tag], msg);
- }
}
delete incomingMsg;
}
@@ -432,14 +470,60 @@ void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
returnsHandler = handler;
}
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
+// Close called by local application.
+void Channel::close(
+ u_int16_t code, const std::string& text,
+ ClassId classId, MethodId methodId)
+{
+ // FIXME aconway 2007-01-26: Locking?
+ if (getId() != 0 && isOpen()) {
+ try {
+ sendAndReceive<ChannelCloseOkBody>(
+ new ChannelCloseBody(version, code, text, classId, methodId));
+ cancelAll();
+ closeInternal();
+ } catch (...) {
+ closeInternal();
+ throw;
+ }
+ }
+}
+
+// Channel closed by peer.
+void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+ assert(isOpen());
+ closeInternal();
+ // FIXME aconway 2007-01-26: How to throw the proper exception
+ // to the application thread?
}
-void Channel::close(){
- if(con != 0){
- con->closeChannel(this);
+void Channel::closeInternal() {
+ assert(isOpen());
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ connection = 0;
+ // A 0 response means we are closed.
+ responses.signalResponse(AMQMethodBody::shared_ptr());
+ dispatchMonitor.notify();
}
+ dispatcher.join();
}
+
+void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m)
+{
+ responses.expect();
+ send(toSend);
+ responses.receive(c, m);
+}
+
+void Channel::sendAndReceiveSync(
+ bool sync, AMQBody* body, ClassId c, MethodId m)
+{
+ if(sync)
+ sendAndReceive(body, c, m);
+ else
+ send(body);
+}
+
+
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index e7bab8b4ee..67274ddfc4 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -27,7 +27,6 @@
#include "sys/types.h"
#include <framing/amqp_framing.h>
-#include <Connection.h>
#include <ClientExchange.h>
#include <IncomingMessage.h>
#include <ClientMessage.h>
@@ -35,86 +34,126 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <ReturnedMessageHandler.h>
+#include "Runnable.h"
+#include "ChannelAdapter.h"
+#include "Thread.h"
namespace qpid {
+namespace framing {
+class ChannelCloseBody;
+}
+
namespace client {
- /**
- * The available acknowledgements modes
- *
- * \ingroup clientapi
- */
- enum ack_modes {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
+
+class Connection;
+
+/**
+ * The available acknowledgements modes
+ *
+ * \ingroup clientapi
+ */
+enum ack_modes {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+/**
+ * Represents an AMQP channel, i.e. loosely a session of work. It
+ * is through a channel that most of the AMQP 'methods' are
+ * exposed.
+ *
+ * \ingroup clientapi
+ */
+class Channel : public framing::ChannelAdapter,
+ public sys::Runnable
+{
+ struct Consumer{
+ MessageListener* listener;
+ int ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
};
+ typedef std::map<std::string, Consumer> ConsumerMap;
+ static const std::string OK;
+
+ Connection* connection;
+ sys::Thread dispatcher;
+ IncomingMessage* incoming;
+ ResponseHandler responses;
+ std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+ IncomingMessage* retrieved;//holds response to basic.get
+ sys::Monitor dispatchMonitor;
+ sys::Monitor retrievalMonitor;
+ ConsumerMap consumers;
+ ReturnedMessageHandler* returnsHandler;
- /**
- * Represents an AMQP channel, i.e. loosely a session of work. It
- * is through a channel that most of the AMQP 'methods' are
- * exposed.
- *
- * \ingroup clientapi
- */
- class Channel : private virtual framing::BodyHandler,
- public virtual sys::Runnable
- {
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string,Consumer*>::iterator consumer_iterator;
+ u_int16_t prefetch;
+ const bool transactional;
+ framing::ProtocolVersion version;
+
+ void enqueue();
+ void retrieve(Message& msg);
+ IncomingMessage* dequeue();
+ void dispatch();
+ void deliver(Consumer& consumer, Message& msg);
+
+ void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+
+ void handleMethodInContext(
+ framing::AMQMethodBody::shared_ptr,
+ const framing::MethodContext& method);
+ void setQos();
+ void cancelAll();
- u_int16_t id;
- Connection* con;
- sys::Thread dispatcher;
- framing::OutputHandler* out;
- IncomingMessage* incoming;
- ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- sys::Monitor dispatchMonitor;
- sys::Monitor retrievalMonitor;
- std::map<std::string, Consumer*> consumers;
- ReturnedMessageHandler* returnsHandler;
- bool closed;
+ void protocolInit(
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost);
+
+ void sendAndReceive(
+ framing::AMQBody*, framing::ClassId, framing::MethodId);
- u_int16_t prefetch;
- const bool transactional;
- framing::ProtocolVersion version;
+ void sendAndReceiveSync(
+ bool sync,
+ framing::AMQBody*, framing::ClassId, framing::MethodId);
- void enqueue();
- void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
- void stop();
- void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
- void deliver(Consumer* consumer, Message& msg);
- void setQos();
- void cancelAll();
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) {
+ sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ return boost::shared_polymorphic_downcast<BodyType>(
+ responses.getResponse());
+ }
- virtual void handleMethod(framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleRequest(framing::AMQRequestBody::shared_ptr);
- void handleResponse(framing::AMQResponseBody::shared_ptr);
+ template <class BodyType> void sendAndReceiveSync(
+ bool sync, framing::AMQBody* body) {
+ sendAndReceiveSync(
+ sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ }
- public:
- /**
+ void open(framing::ChannelId, Connection&);
+ void closeInternal();
+ void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+
+ friend class Connection;
+
+ public:
+
+ bool isOpen() const;
+
+ /**
* Creates a channel object.
*
* @param transactional if true, the publishing and acknowledgement
@@ -124,199 +163,202 @@ namespace client {
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- */
+ */
Channel(bool transactional = false, u_int16_t prefetch = 500);
~Channel();
- /**
- * Declares an exchange.
- *
- * In AMQP Exchanges are the destinations to which messages
- * are published. They have Queues bound to them and route
- * messages they receive to those queues. The routing rules
- * depend on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareExchange(Exchange& exchange, bool synch = true);
- /**
- * Deletes an exchange
- *
- * @param exchange an Exchange object representing the exchange to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteExchange(Exchange& exchange, bool synch = true);
- /**
- * Declares a Queue
- *
- * @param queue a Queue object representing the queue to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareQueue(Queue& queue, bool synch = true);
- /**
- * Deletes a Queue
- *
- * @param queue a Queue object representing the queue to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- /**
- * Binds a queue to an exchange. The exact semantics of this
- * (in particular how 'routing keys' and 'binding arguments'
- * are used) depends on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to bind to
- *
- * @param queue a Queue object representing the queue to be
- * bound
- *
- * @param key the 'routing key' for the binding
- *
- * @param args the 'binding arguments' for the binding
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const framing::FieldTable& args, bool synch = true);
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see ack_modes
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
+ /**
+ * Declares an exchange.
+ *
+ * In AMQP Exchanges are the destinations to which messages
+ * are published. They have Queues bound to them and route
+ * messages they receive to those queues. The routing rules
+ * depend on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Deletes an exchange
+ *
+ * @param exchange an Exchange object representing the exchange to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Declares a Queue
+ *
+ * @param queue a Queue object representing the queue to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareQueue(Queue& queue, bool synch = true);
+ /**
+ * Deletes a Queue
+ *
+ * @param queue a Queue object representing the queue to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ /**
+ * Binds a queue to an exchange. The exact semantics of this
+ * (in particular how 'routing keys' and 'binding arguments'
+ * are used) depends on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to bind to
+ *
+ * @param queue a Queue object representing the queue to be
+ * bound
+ *
+ * @param key the 'routing key' for the binding
+ *
+ * @param args the 'binding arguments' for the binding
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
+ const framing::FieldTable& args, bool synch = true);
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see ack_modes
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * ack_modes)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(const std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * ack_modes)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
- /**
- * For a transactional channel this will commit all
- * publications and acknowledgements since the last commit (or
- * the channel was opened if there has been no previous
- * commit). This will cause published messages to become
- * available to consumers and acknowledged messages to be
- * consumed and removed from the queues they were dispatched
- * from.
- *
- * Transactionailty of a channel is specified when the channel
- * object is created (@see Channel()).
- */
- void commit();
- /**
- * For a transactional channel, this will rollback any
- * publications or acknowledgements. It will be as if the
- * ppblished messages were never sent and the acknowledged
- * messages were never consumed.
- */
- void rollback();
+ /**
+ * For a transactional channel this will commit all
+ * publications and acknowledgements since the last commit (or
+ * the channel was opened if there has been no previous
+ * commit). This will cause published messages to become
+ * available to consumers and acknowledged messages to be
+ * consumed and removed from the queues they were dispatched
+ * from.
+ *
+ * Transactionailty of a channel is specified when the channel
+ * object is created (@see Channel()).
+ */
+ void commit();
+ /**
+ * For a transactional channel, this will rollback any
+ * publications or acknowledgements. It will be as if the
+ * ppblished messages were never sent and the acknowledged
+ * messages were never consumed.
+ */
+ void rollback();
- /**
- * Change the prefetch in use.
- */
- void setPrefetch(u_int16_t prefetch);
+ /**
+ * Change the prefetch in use.
+ */
+ void setPrefetch(u_int16_t prefetch);
- /**
- * Start message dispatching on a new thread
- */
- void start();
- /**
- * Do message dispatching on this thread
- */
- void run();
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
- /**
- * Closes a channel, stopping any message dispatching.
- */
- void close();
+ // TODO aconway 2007-01-26: Can it be private?
+ /**
+ * Dispatch messages on this channel in the calling thread.
+ */
+ void run();
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+ /**
+ * Close the channel with optional error information.
+ * Closing a channel that is not open has no effect.
+ */
+ void close(
+ framing::ReplyCode = 200, const std::string& =OK,
+ framing::ClassId = 0, framing::MethodId = 0);
- friend class Connection;
- };
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+};
}
}
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 1ae317db62..19d5cce7db 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -18,35 +18,46 @@
* under the License.
*
*/
+#include <boost/format.hpp>
+
#include <Connection.h>
#include <ClientChannel.h>
#include <ClientMessage.h>
#include <QpidError.h>
#include <iostream>
+#include <sstream>
#include <MethodBodyInstances.h>
-using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::sys;
-u_int16_t Connection::channelIdCounter;
+
+namespace qpid {
+namespace client {
+
+ChannelId Connection::channelIdCounter;
+
+const std::string Connection::OK("OK");
Connection::Connection(
bool debug, u_int32_t _max_frame_size,
- qpid::framing::ProtocolVersion* _version
+ const framing::ProtocolVersion& _version
) : max_frame_size(_max_frame_size), closed(true),
- version(_version->getMajor(),_version->getMinor())
+ version(_version)
{
- connector = new Connector(
- version, requester, responder, debug, _max_frame_size);
+ connector = new Connector(version, debug, _max_frame_size);
}
Connection::~Connection(){
delete connector;
}
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+void Connection::open(
+ const std::string& _host, int _port, const std::string& uid,
+ const std::string& pwd, const std::string& virtualhost)
+{
+
host = _host;
port = _port;
connector->setInputHandler(this);
@@ -55,197 +66,69 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
out = connector->getOutputHandler();
connector->connect(host, port);
- ProtocolInitiation* header = new ProtocolInitiation(version);
- responses.expect();
- connector->init(header);
- responses.receive(method_bodies.connection_start);
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- responses.receive(method_bodies.connection_tune);
-
- ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
- u_int16_t heartbeat = proposal->getHeartbeat();
- connector->setReadTimeout(heartbeat * 2);
- connector->setWriteTimeout(heartbeat);
-
- //send connection.open
- string capabilities;
- string vhost = virtualhost;
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate(method_bodies.connection_open_ok)){
- //ok
- }else if(responses.validate(method_bodies.connection_redirect)){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost() << std::endl;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
- }
-
+ // Open the special channel 0.
+ channels[0] = &channel0;
+ channel0.open(0, *this);
+ channel0.protocolInit(uid, pwd, virtualhost);
}
-void Connection::close(){
- if(!closed){
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
-
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+void Connection::close(
+ ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+)
+{
+ if(!closed) {
+ channel0.sendAndReceive<ConnectionCloseOkBody>(
+ new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId));
connector->close();
}
}
-void Connection::openChannel(Channel* channel){
- channel->con = this;
- channel->id = ++channelIdCounter;
- channel->out = out;
- channels[channel->id] = channel;
- //now send frame to open channel and wait for response
- string oob;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
- channel->setQos();
- channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
- //send frame to close channel
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
- closeChannel(channel, code, text, classId, methodId);
+// FIXME aconway 2007-01-26: make channels owned and created by connection?
+void Connection::openChannel(Channel& channel) {
+ ChannelId id = ++channelIdCounter;
+ assert (channels.find(id) == channels.end());
+ assert(out);
+ channels[id] = &channel;
+ channel.open(id, *this);
}
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
- //send frame to close channel
- channel->cancelAll();
- channel->closed = true;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
- channel->con = 0;
- channel->out = 0;
- removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
- //send frame to close channel
-
- channels.erase(channel->id);
- channel->out = 0;
- channel->id = 0;
- channel->con = 0;
+void Connection::erase(ChannelId id) {
+ channels.erase(id);
}
void Connection::received(AMQFrame* frame){
- AMQBody::shared_ptr body = frame->getBody();
- u_int8_t type = body->type();
- if (type == REQUEST_BODY)
- responder.received(AMQRequestBody::getData(body));
- handleFrame(frame);
- if (type == RESPONSE_BODY)
- requester.processed(AMQResponseBody::getData(body));
-}
-
-void Connection::handleFrame(AMQFrame* frame){
- u_int16_t channelId = frame->getChannel();
-
- if(channelId == 0){
- this->handleBody(frame->getBody());
- }else{
- Channel* channel = channels[channelId];
- if(channel == 0){
- error(504, "Unknown channel");
- }else{
- try{
- channel->handleBody(frame->getBody());
- }catch(qpid::QpidError e){
- channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
- }
+ // FIXME aconway 2007-01-25: Mutex
+ ChannelId id = frame->getChannel();
+ Channel* channel = channels[id];
+ // FIXME aconway 2007-01-26: Exception thrown here is hanging the
+ // client. Need to review use of exceptions.
+ if (channel == 0)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR+504,
+ (boost::format("Invalid channel number %g") % id).str());
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(const qpid::QpidError& e){
+ channelException(
+ *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
}
}
-void Connection::handleRequest(AMQRequestBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Connection::handleResponse(AMQResponseBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
- //connection.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(method_bodies.connection_close.match(body.get())){
- //send back close ok
- //close socket
- ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
- std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
- connector->close();
- }else{
- std::cout << "Unhandled method for connection: " << *body << std::endl;
- error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
- }
-}
-
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
- error(504, "Channel error: received header body with channel 0.");
-}
-
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
- error(504, "Channel error: received content body with channel 0.");
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
-
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
+void Connection::send(AMQFrame* frame) {
out->send(frame);
- responses.receive(body);
}
-void Connection::error(int code, const string& msg, int classid, int methodid){
- std::cout << "Connection exception generated: " << code << msg;
- if(classid || methodid){
- std::cout << " [" << methodid << ":" << classid << "]";
- }
- std::cout << std::endl;
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
- connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
- int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+void Connection::channelException(
+ Channel& channel, AMQMethodBody* method, const QpidError& e)
+{
+ int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
string msg = e.msg;
- if(method == 0){
- closeChannel(channel, code, msg);
- }else{
- closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
- }
+ if(method == 0)
+ channel.close(code, msg);
+ else
+ channel.close(
+ code, msg, method->amqpClassId(), method->amqpMethodId());
}
void Connection::idleIn(){
@@ -259,9 +142,12 @@ void Connection::idleOut(){
void Connection::shutdown(){
closed = true;
- //close all channels
- for(iterator i = channels.begin(); i != channels.end(); i++){
- i->second->stop();
+ //close all channels, also removes them from the map.
+ while(!channels.empty()){
+ Channel* channel = channels.begin()->second;
+ if (channel != 0)
+ channel->close();
}
- responses.signalResponse(AMQMethodBody::shared_ptr());
}
+
+}} // namespace qpid::client
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 9c9b067f88..6ee9e62e47 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -1,3 +1,6 @@
+#ifndef _Connection_
+#define _Connection_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,15 +23,15 @@
*/
#include <map>
#include <string>
+#include <boost/shared_ptr.hpp>
-#ifndef _Connection_
-#define _Connection_
-
+#include "amqp_types.h"
#include <QpidError.h>
#include <Connector.h>
#include <sys/ShutdownHandler.h>
#include <sys/TimeoutHandler.h>
+#include "framing/amqp_types.h"
#include <framing/amqp_framing.h>
#include <ClientExchange.h>
#include <IncomingMessage.h>
@@ -37,150 +40,152 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <AMQP_HighestVersion.h>
-#include "Requester.h"
-#include "Responder.h"
+#include "ClientChannel.h"
namespace qpid {
+/**
+ * The client namespace contains all classes that make up a client
+ * implementation of the AMQP protocol. The key classes that form
+ * the basis of the client API to be used by applications are
+ * Connection and Channel.
+ */
+namespace client {
+
+class Channel;
+
+/**
+ * \internal provide access to selected private channel functions
+ * for the Connection without making it a friend of the entire channel.
+ */
+class ConnectionForChannel :
+ public framing::InputHandler,
+ public framing::OutputHandler,
+ public sys::TimeoutHandler,
+ public sys::ShutdownHandler
+
+{
+ private:
+ friend class Channel;
+ virtual void erase(framing::ChannelId) = 0;
+};
+
+
+/**
+ * \defgroup clientapi Application API for an AMQP client
+ */
+
+/**
+ * Represents a connection to an AMQP broker. All communication is
+ * initiated by establishing a connection, then opening one or
+ * more Channels over that connection.
+ *
+ * \ingroup clientapi
+ */
+class Connection : public ConnectionForChannel
+{
+ typedef std::map<framing::ChannelId, Channel*> ChannelMap;
+
+ static framing::ChannelId channelIdCounter;
+ static const std::string OK;
+
+ std::string host;
+ int port;
+ const u_int32_t max_frame_size;
+ ChannelMap channels;
+ Connector* connector;
+ framing::OutputHandler* out;
+ volatile bool closed;
+ framing::ProtocolVersion version;
+
+ void erase(framing::ChannelId);
+ void channelException(
+ Channel&, framing::AMQMethodBody*, const QpidError&);
+ Channel channel0;
+
+ // TODO aconway 2007-01-26: too many friendships, untagle these classes.
+ friend class Channel;
+
+ public:
+ const framing::ProtocolVersion& getVersion() const { return version; }
+
+ /**
+ * Creates a connection object, but does not open the
+ * connection.
+ *
+ * @param _version the version of the protocol to connect with
+ *
+ * @param debug turns on tracing for the connection
+ * (i.e. prints details of the frames sent and received to std
+ * out). Optional and defaults to false.
+ *
+ * @param max_frame_size the maximum frame size that the
+ * client will accept. Optional and defaults to 65536.
+ */
+ Connection(
+ bool debug = false, u_int32_t max_frame_size = 65536,
+ const framing::ProtocolVersion& = framing::highestProtocolVersion);
+ ~Connection();
+
/**
- * The client namespace contains all classes that make up a client
- * implementation of the AMQP protocol. The key classes that form
- * the basis of the client API to be used by applications are
- * Connection and Channel.
+ * Opens a connection to a broker.
+ *
+ * @param host the host on which the broker is running
+ *
+ * @param port the port on the which the broker is listening
+ *
+ * @param uid the userid to connect with
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text)
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
*/
-namespace client {
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest", const std::string& pwd = "guest",
+ const std::string& virtualhost = "/");
- class Channel;
/**
- * \defgroup clientapi Application API for an AMQP client
+ * Close the connection with optional error information for the peer.
+ *
+ * Any further use of this connection (without reopening it) will
+ * not succeed.
*/
+ void close(framing::ReplyCode=200, const std::string& msg=OK,
+ framing::ClassId = 0, framing::MethodId = 0);
/**
- * Represents a connection to an AMQP broker. All communication is
- * initiated by establishing a connection, then opening one or
- * more Channels over that connection.
+ * Associate a Channel with this connection and open it for use.
+ *
+ * In AMQP channels are like multi-plexed 'sessions' of work over
+ * a connection. Almost all the interaction with AMQP is done over
+ * a channel.
*
- * \ingroup clientapi
+ * @param connection the connection object to be associated with
+ * the channel. Call Channel::close() to close the channel.
+ */
+ void openChannel(Channel&);
+
+
+ // TODO aconway 2007-01-26: can these be private?
+ void send(framing::AMQFrame*);
+ void received(framing::AMQFrame*);
+ void idleOut();
+ void idleIn();
+ void shutdown();
+
+ /**
+ * @return the maximum frame size in use on this connection
*/
- class Connection : public virtual qpid::framing::InputHandler,
- public virtual qpid::sys::TimeoutHandler,
- public virtual qpid::sys::ShutdownHandler,
- private virtual qpid::framing::BodyHandler
- {
-
- typedef std::map<int, Channel*>::iterator iterator;
-
- static u_int16_t channelIdCounter;
-
- std::string host;
- int port;
- const u_int32_t max_frame_size;
- std::map<int, Channel*> channels;
- Connector* connector;
- qpid::framing::OutputHandler* out;
- ResponseHandler responses;
- volatile bool closed;
- framing::ProtocolVersion version;
- framing::Requester requester;
- framing::Responder responder;
-
- void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e);
- void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
- void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
- void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
-
- // FIXME aconway 2007-01-19: Use channel(0) not connection
- // to handle channel 0 requests. Remove handler methods.
- //
- void handleRequest(framing::AMQRequestBody::shared_ptr);
- void handleResponse(framing::AMQResponseBody::shared_ptr);
- void handleMethod(framing::AMQMethodBody::shared_ptr);
- void handleHeader(framing::AMQHeaderBody::shared_ptr);
- void handleContent(framing::AMQContentBody::shared_ptr);
- void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr);
- void handleFrame(framing::AMQFrame* frame);
-
- public:
- /**
- * Creates a connection object, but does not open the
- * connection.
- *
- * @param _version the version of the protocol to connect with
- *
- * @param debug turns on tracing for the connection
- * (i.e. prints details of the frames sent and received to std
- * out). Optional and defaults to false.
- *
- * @param max_frame_size the maximum frame size that the
- * client will accept. Optional and defaults to 65536.
- */
- Connection( bool debug = false, u_int32_t max_frame_size = 65536,
- framing::ProtocolVersion* _version = &(framing::highestProtocolVersion));
- ~Connection();
-
- /**
- * Opens a connection to a broker.
- *
- * @param host the host on which the broker is running
- *
- * @param port the port on the which the broker is listening
- *
- * @param uid the userid to connect with
- *
- * @param pwd the password to connect with (currently SASL
- * PLAIN is the only authentication method supported so this
- * is sent in clear text)
- *
- * @param virtualhost the AMQP virtual host to use (virtual
- * hosts, where implemented(!), provide namespace partitioning
- * within a single broker).
- */
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest", const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
- /**
- * Closes the connection. Any further use of this connection
- * (without reopening it) will not succeed.
- */
- void close();
- /**
- * Opens a Channel. In AMQP channels are like multi-plexed
- * 'sessions' of work over a connection. Almost all the
- * interaction with AMQP is done over a channel.
- *
- * @param channel a pointer to a channel instance that will be
- * used to represent the new channel.
- */
- void openChannel(Channel* channel);
- /*
- * Requests that the server close this channel, then removes
- * the association to the channel from this connection
- *
- * @param channel a pointer to the channel instance to close
- */
- void closeChannel(Channel* channel);
- /*
- * Removes the channel from association with this connection,
- * without sending a close request to the server.
- *
- * @param channel a pointer to the channel instance to
- * disassociate
- */
- void removeChannel(Channel* channel);
-
- virtual void received(framing::AMQFrame* frame);
-
- virtual void idleOut();
- virtual void idleIn();
-
- virtual void shutdown();
-
- /**
- * @return the maximum frame size in use on this connection
- */
- inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
- };
+ inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+
+ /** @return protocol version in use on this connection. */
+ const framing::ProtocolVersion& getVersion() { return version; }
+};
}
}
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index d05540ba32..425cecaf6f 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -22,8 +22,6 @@
#include <QpidError.h>
#include <sys/Time.h>
#include "Connector.h"
-#include "Requester.h"
-#include "Responder.h"
using namespace qpid::sys;
using namespace qpid::client;
@@ -31,7 +29,6 @@ using namespace qpid::framing;
using qpid::QpidError;
Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
- Requester& req, Responder& resp,
bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
@@ -44,9 +41,7 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
timeoutHandler(0),
shutdownHandler(0),
inbuf(receive_buffer_size),
- outbuf(send_buffer_size),
- requester(req),
- responder(resp)
+ outbuf(send_buffer_size)
{ }
Connector::~Connector(){ }
@@ -58,9 +53,9 @@ void Connector::connect(const std::string& host, int port){
receiver = Thread(this);
}
-void Connector::init(ProtocolInitiation* header){
- writeBlock(header);
- delete header;
+void Connector::init(){
+ ProtocolInitiation init(version);
+ writeBlock(&init);
}
void Connector::close(){
@@ -81,16 +76,11 @@ OutputHandler* Connector::getOutputHandler(){
return this;
}
-void Connector::send(AMQFrame* frame){
+void Connector::send(AMQFrame* f){
+ std::auto_ptr<AMQFrame> frame(f);
AMQBody::shared_ptr body = frame->getBody();
- u_int8_t type = body->type();
- if (type == REQUEST_BODY)
- requester.sending(AMQRequestBody::getData(body));
- else if (type == RESPONSE_BODY)
- responder.sending(AMQResponseBody::getData(body));
- writeBlock(frame);
+ writeBlock(frame.get());
if(debug) std::cout << "SENT: " << *frame << std::endl;
- delete frame;
}
void Connector::writeBlock(AMQDataBlock* data){
@@ -185,10 +175,8 @@ void Connector::run(){
inbuf.compact();
}
}
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg
- << " (" << error.location.file << ":" << error.location.line
- << ")" << std::endl;
+ } catch (const std::exception& e) {
+ std::cout << e.what() << std::endl;
handleClosed();
}
}
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index 02926b2bdb..40663486f2 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -35,13 +35,6 @@
namespace qpid {
-namespace framing {
-
-class Requester;
-class Responder;
-
-} // namespace framing
-
namespace client {
class Connector : public qpid::framing::OutputHandler,
@@ -74,9 +67,6 @@ class Connector : public qpid::framing::OutputHandler,
qpid::sys::Socket socket;
- qpid::framing::Requester& requester;
- qpid::framing::Responder& responder;
-
void checkIdle(ssize_t status);
void writeBlock(qpid::framing::AMQDataBlock* data);
void writeToSocket(char* data, size_t available);
@@ -85,13 +75,13 @@ class Connector : public qpid::framing::OutputHandler,
void run();
void handleClosed();
+ friend class Channel;
public:
Connector(const qpid::framing::ProtocolVersion& pVersion,
- qpid::framing::Requester& req, qpid::framing::Responder& resp,
bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void init();
virtual void close();
virtual void setInputHandler(qpid::framing::InputHandler* handler);
virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index 68c7e52013..8950138f5e 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -18,27 +18,35 @@
* under the License.
*
*/
+#include <boost/format.hpp>
+
#include <ResponseHandler.h>
#include <sys/Monitor.h>
#include <QpidError.h>
+#include "amqp_types.h"
using namespace qpid::sys;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace client {
-qpid::client::ResponseHandler::ResponseHandler() : waiting(false){}
+ResponseHandler::ResponseHandler() : waiting(false){}
-qpid::client::ResponseHandler::~ResponseHandler(){}
+ResponseHandler::~ResponseHandler(){}
-bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
- return response != 0 && expected.match(response.get());
+bool ResponseHandler::validate(ClassId c, MethodId m) {
+ return response != 0 &&
+ response->amqpClassId() ==c && response->amqpMethodId() == m;
}
-void qpid::client::ResponseHandler::waitForResponse(){
+void ResponseHandler::waitForResponse(){
Monitor::ScopedLock l(monitor);
while (waiting)
monitor.wait();
}
-void qpid::client::ResponseHandler::signalResponse(
+void ResponseHandler::signalResponse(
qpid::framing::AMQMethodBody::shared_ptr _response)
{
Monitor::ScopedLock l(monitor);
@@ -47,15 +55,26 @@ void qpid::client::ResponseHandler::signalResponse(
monitor.notify();
}
-void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
+void ResponseHandler::receive(ClassId c, MethodId m) {
Monitor::ScopedLock l(monitor);
while (waiting)
monitor.wait();
- if(!validate(expected)){
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
+ if (!response) {
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ }
+ if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR,
+ (boost::format(
+ "Expected class:method %d:%d, got %d:%d")
+ % c % m % response->amqpClassId() % response->amqpMethodId()
+ ).str());
}
}
-void qpid::client::ResponseHandler::expect(){
+void ResponseHandler::expect(){
waiting = true;
}
+
+}} // namespace qpid::client
diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h
index c3d499d046..c402bcbc65 100644
--- a/cpp/lib/client/ResponseHandler.h
+++ b/cpp/lib/client/ResponseHandler.h
@@ -19,6 +19,8 @@
*
*/
#include <string>
+
+#include <amqp_types.h>
#include <framing/amqp_framing.h>
#include <sys/Monitor.h>
@@ -26,26 +28,39 @@
#define _ResponseHandler_
namespace qpid {
- namespace client {
-
- class ResponseHandler{
- bool waiting;
- qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::sys::Monitor monitor;
-
- public:
- ResponseHandler();
- ~ResponseHandler();
- inline bool isWaiting(){ return waiting; }
- inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
- bool validate(const qpid::framing::AMQMethodBody& expected);
- void waitForResponse();
- void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
- void receive(const qpid::framing::AMQMethodBody& expected);
- void expect();//must be called before calling receive
- };
+namespace client {
+
+/**
+ * Holds a response from the broker peer for the client.
+ */
+class ResponseHandler{
+ bool waiting;
+ qpid::framing::AMQMethodBody::shared_ptr response;
+ qpid::sys::Monitor monitor;
+
+ public:
+ ResponseHandler();
+ ~ResponseHandler();
+
+ bool isWaiting(){ return waiting; }
+ framing::AMQMethodBody::shared_ptr getResponse(){ return response;}
+ void waitForResponse();
+
+ void signalResponse(framing::AMQMethodBody::shared_ptr response);
+
+ void expect();//must be called before calling receive
+ bool validate(framing::ClassId, framing::MethodId);
+ void receive(framing::ClassId, framing::MethodId);
+ template <class BodyType> bool validate() {
+ return validate(BodyType::CLASS_ID, BodyType::METHOD_ID);
}
+ template <class BodyType> void receive() {
+ return receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
+ }
+};
+
+}
}