diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-15 18:28:29 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-15 18:28:29 +0000 |
| commit | 87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274 (patch) | |
| tree | 03929353228d063bf8892d6c57d4bf46fa467ddd /cpp/lib/broker/SessionHandlerImpl.cpp | |
| parent | fa15e6d52022cc1576b19e3caaecf66260c1923e (diff) | |
| download | qpid-python-87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274.tar.gz | |
* Refactor: Moved major broker components (exchanges, queues etc.) from
class SessionHandlerImplFactory to more logical class Broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/SessionHandlerImpl.cpp')
| -rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 58 |
1 files changed, 30 insertions, 28 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index f34ef59922..905ac83b92 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -26,21 +26,22 @@ #include "assert.h" using namespace boost; -using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; using namespace qpid::sys; -SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, - QueueRegistry* _queues, - ExchangeRegistry* _exchanges, - AutoDelete* _cleaner, - const Settings& _settings) : +namespace qpid { +namespace broker { + +SessionHandlerImpl::SessionHandlerImpl( + SessionContext* _context, Broker& broker) : + context(_context), - queues(_queues), - exchanges(_exchanges), - cleaner(_cleaner), - settings(_settings), + client(0), + queues(broker.getQueues()), + exchanges(broker.getExchanges()), + cleaner(broker.getCleaner()), + settings(broker.getTimeout(), broker.getStagingThreshold()), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -49,10 +50,8 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, txHandler(new TxHandlerImpl(this)), messageHandler(new MessageHandlerImpl(this)), framemax(65536), - heartbeat(0){ - - client =NULL; -} + heartbeat(0) +{} SessionHandlerImpl::~SessionHandlerImpl(){ @@ -75,7 +74,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha queue = getChannel(channel)->getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { - queue = queues->find(name); + queue = queues.find(name); if (queue == 0) { throw ChannelException( 404, "Queue not found: " + name); } @@ -85,7 +84,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ - return exchanges->get(name); + return exchanges.get(name); } void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ @@ -96,8 +95,10 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ switch(body->type()) { case REQUEST_BODY: + // responder.received(frame); case RESPONSE_BODY: - case METHOD_BODY: + // requester.received(frame); + case METHOD_BODY: // method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); try{ method->invoke(*this, channel); @@ -164,7 +165,7 @@ void SessionHandlerImpl::closed(){ } for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ string name = (*i)->getName(); - queues->destroy(name); + queues.destroy(name); exclusiveQueues.erase(i); } } catch(std::exception& e) { @@ -221,7 +222,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin parent->channels[channel] = new Channel( parent->client->getProtocolVersion() , parent->context, channel, - parent->framemax, parent->queues->getStore(), + parent->framemax, parent->queues.getStore(), parent->settings.stagingThreshold); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 @@ -251,12 +252,12 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 const FieldTable& /*arguments*/){ if(passive){ - if(!parent->exchanges->get(exchange)){ + if(!parent->exchanges.get(exchange)){ throw ChannelException(404, "Exchange not found: " + exchange); } }else{ try{ - std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type); + std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type); if(!response.second && response.first->getType() != type){ throw ConnectionException(507, "Exchange already declared to be of type " + response.first->getType() + ", requested " + type); @@ -288,7 +289,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused - parent->exchanges->destroy(exchange); + parent->exchanges.destroy(exchange); if(!nowait) parent->client->getExchange().deleteOk(channel); } @@ -300,7 +301,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = parent->getQueue(name, channel); } else { std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); + parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -310,11 +311,11 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue_created.first->create(arguments); //add default binding: - parent->exchanges->getDefault()->bind(queue, name, 0); + parent->exchanges.getDefault()->bind(queue, name, 0); if (exclusive) { parent->exclusiveQueues.push_back(queue); } else if(autoDelete){ - parent->cleaner->add(queue); + parent->cleaner.add(queue); } } } @@ -332,7 +333,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t const FieldTable& arguments){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); + Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); if(exchange){ // kpvdr - cannot use this any longer as routingKey is now const // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); @@ -369,7 +370,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t } count = q->getMessageCount(); q->destroy(); - parent->queues->destroy(queue); + parent->queues.destroy(queue); } if(!nowait) parent->client->getQueue().deleteOk(channel, count); @@ -424,7 +425,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ - Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); if(exchange){ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); parent->getChannel(channel)->handlePublish(msg, exchange); @@ -652,3 +653,4 @@ SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } +}} |
