From 82e07bb30905feb2c11bb6d9f3624f976ab070a5 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 21 Sep 2006 16:19:13 +0000 Subject: Use the generated proxy classes rather than constructing frames directly. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@448597 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/broker/inc/SessionHandlerImpl.h | 2 + cpp/broker/src/SessionHandlerImpl.cpp | 39 +++++------ .../framing/generated/stylesheets/amqp_client.xsl | 26 +++---- .../framing/generated/stylesheets/amqp_server.xsl | 80 +++++++++++++++------- 4 files changed, 89 insertions(+), 58 deletions(-) (limited to 'cpp') diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h index 14a6404c78..167bf0cc23 100644 --- a/cpp/broker/inc/SessionHandlerImpl.h +++ b/cpp/broker/inc/SessionHandlerImpl.h @@ -23,6 +23,7 @@ #include #include #include "AMQFrame.h" +#include "AMQP_ClientProxy.h" #include "AMQP_ServerOperations.h" #include "AutoDelete.h" #include "ExchangeRegistry.h" @@ -64,6 +65,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, typedef std::vector::iterator queue_iterator; qpid::io::SessionContext* context; + qpid::framing::AMQP_ClientProxy client; QueueRegistry* queues; ExchangeRegistry* const exchanges; AutoDelete* const cleaner; diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 19e243a01b..a75b8fcf0f 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -40,6 +40,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, basicHandler(new BasicHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), + client(context), framemax(65536), heartbeat(0){ @@ -90,9 +91,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ }catch(ChannelException& e){ channels[channel]->close(); channels.erase(channel); - context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ - context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); } break; @@ -116,7 +117,7 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales))); + client.getConnection().start(0, 8, 0, properties, mechanisms, locales); } void SessionHandlerImpl::idleOut(){ @@ -156,7 +157,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism, string& response, string& locale){ - parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat))); + parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){} @@ -168,13 +169,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int1 void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){ string knownhosts; - parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts))); + parent->client.getConnection().openOk(0, knownhosts); } void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, u_int16_t methodId){ - parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody())); + parent->client.getConnection().closeOk(0); parent->context->close(); } @@ -186,7 +187,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); - parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody())); + parent->client.getChannel().openOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){} @@ -198,7 +199,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t parent->channels.erase(channel); c->close(); delete c; - parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody())); + parent->client.getChannel().closeOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} @@ -230,7 +231,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 } parent->exchanges->getLock()->release(); if(!nowait){ - parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody())); + parent->client.getExchange().declareOk(channel); } } @@ -239,11 +240,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 parent->exchanges->getLock()->acquire(); parent->exchanges->destroy(exchange); parent->exchanges->getLock()->release(); - if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody())); + if(!nowait) parent->client.getExchange().deleteOk(channel); } - - - void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name, bool passive, bool durable, bool exclusive, @@ -271,8 +269,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t } if(!nowait){ name = queue->getName(); - QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount()); - parent->context->send(new AMQFrame(channel, response)); + parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); } } @@ -285,7 +282,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic if(exchange){ if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName(); exchange->bind(queue, routingKey, &arguments); - if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody())); + if(!nowait) parent->client.getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } @@ -295,7 +292,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ti Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); - if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count))); + if(!nowait) parent->client.getQueue().purgeOk(channel, count); } void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue, @@ -316,7 +313,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t count = q->getMessageCount(); parent->queues->destroy(queue); } - if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count))); + if(!nowait) parent->client.getQueue().deleteOk(channel, count); } @@ -327,7 +324,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref //TODO: channel doesn't do anything with these qos parameters yet parent->channels[channel]->setPrefetchSize(prefetchSize); parent->channels[channel]->setPrefetchCount(prefetchCount); - parent->context->send(new AMQFrame(channel, new BasicQosOkBody())); + parent->client.getBasic().qosOk(channel); } void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket, @@ -344,7 +341,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ try{ channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); + if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -357,7 +354,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ parent->channels[channel]->cancel(consumerTag); - if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag))); + if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket, diff --git a/cpp/common/framing/generated/stylesheets/amqp_client.xsl b/cpp/common/framing/generated/stylesheets/amqp_client.xsl index f0fa3d7890..13a912a926 100644 --- a/cpp/common/framing/generated/stylesheets/amqp_client.xsl +++ b/cpp/common/framing/generated/stylesheets/amqp_client.xsl @@ -11,11 +11,11 @@ --> - + -#ifndef _AMQP_Client_ -#define _AMQP_Client_ +#ifndef _AMQP_ServerProxy_ +#define _AMQP_ServerProxy_ #include "AMQP_ServerOperations.h" #include "FieldTable.h" @@ -24,13 +24,13 @@ namespace qpid { namespace framing { -class AMQP_Client : virtual public AMQP_ServerOperations +class AMQP_ServerProxy : virtual public AMQP_ServerOperations { OutputHandler* out; public: - AMQP_Client(OutputHandler* _out); - virtual ~AMQP_Client() {} + AMQP_ServerProxy(OutputHandler* _out); + virtual ~AMQP_ServerProxy() {} @@ -76,7 +76,7 @@ class AMQP_Client : virtual public AMQP_ServerOperations }; /* class */ - }; /* class AMQP_Client */ + }; /* class AMQP_ServerProxy */ } /* namespace framing */ } /* namespace qpid */ @@ -94,16 +94,16 @@ class AMQP_Client : virtual public AMQP_ServerOperations --> - + -#include "AMQP_Client.h" +#include "AMQP_ServerProxy.h" namespace qpid { namespace framing { -AMQP_Client::AMQP_Client(OutputHandler* _out) : +AMQP_ServerProxy::AMQP_ServerProxy(OutputHandler* _out) : out(_out) { } @@ -111,15 +111,15 @@ AMQP_Client::AMQP_Client(OutputHandler* _out) : /* ++++++++++ Class: ++++++++++ */ -AMQP_Client::::(OutputHandler* _out) : +AMQP_ServerProxy::::(OutputHandler* _out) : out(_out) { } -AMQP_Client::::~() {} +AMQP_ServerProxy::::~() {} - void AMQP_Client:::: + void AMQP_ServerProxy:::: ( u_int16_t channel , diff --git a/cpp/common/framing/generated/stylesheets/amqp_server.xsl b/cpp/common/framing/generated/stylesheets/amqp_server.xsl index 4ad29a4b95..5ff8994888 100644 --- a/cpp/common/framing/generated/stylesheets/amqp_server.xsl +++ b/cpp/common/framing/generated/stylesheets/amqp_server.xsl @@ -11,11 +11,10 @@ --> - + - -#ifndef _AMQP_Server_ -#define _AMQP_Server_ +#ifndef _AMQP_ClientProxy_ +#define _AMQP_ClientProxy_ #include "AMQP_ClientOperations.h" #include "FieldTable.h" @@ -24,14 +23,15 @@ namespace qpid { namespace framing { -class AMQP_Server : virtual public AMQP_ClientOperations +class AMQP_ClientProxy : virtual public AMQP_ClientOperations { - OutputHandler* out; - public: - AMQP_Server(OutputHandler* _out); - virtual ~AMQP_Server() {} - + + AMQP_ClientProxy(OutputHandler* _out); + virtual ~AMQP_ClientProxy() {}; + + + /** ===== Class: ===== @@ -74,15 +74,32 @@ class AMQP_Server : virtual public AMQP_ClientOperations ); - }; /* class */ + }; /* class */ - }; /* class AMQP_Server */ + + + + ; + + + private: + + OutputHandler* out; + + + + ; + + + + + }; /* class AMQP_ClientProxy */ } /* namespace framing */ } /* namespace qpid */ -#endif - +#endif + @@ -94,32 +111,39 @@ class AMQP_Server : virtual public AMQP_ClientOperations --> - + - -#include "AMQP_Server.h" +#include "AMQP_ClientProxy.h" namespace qpid { namespace framing { -AMQP_Server::AMQP_Server(OutputHandler* _out) : - out(_out) +AMQP_ClientProxy::AMQP_ClientProxy(OutputHandler* _out) : + out(_out), + + + + , + + + { -} +} + /* ++++++++++ Class: ++++++++++ */ -AMQP_Server::::(OutputHandler* _out) : +AMQP_ClientProxy::::(OutputHandler* _out) : out(_out) { } -AMQP_Server::::~() {} +AMQP_ClientProxy::::~() {} - void AMQP_Server:::: + void AMQP_ClientProxy:::: ( u_int16_t channel , @@ -145,8 +169,16 @@ AMQP_Server::::~< - + + + { + ; + } + + + + } /* namespace framing */ } /* namespace qpid */ -- cgit v1.2.1