summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/SessionHandlerImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-15 18:28:29 +0000
committerAlan Conway <aconway@apache.org>2007-01-15 18:28:29 +0000
commit87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274 (patch)
tree03929353228d063bf8892d6c57d4bf46fa467ddd /cpp/lib/broker/SessionHandlerImpl.cpp
parentfa15e6d52022cc1576b19e3caaecf66260c1923e (diff)
downloadqpid-python-87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274.tar.gz
* Refactor: Moved major broker components (exchanges, queues etc.) from
class SessionHandlerImplFactory to more logical class Broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/SessionHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp58
1 files changed, 30 insertions, 28 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index f34ef59922..905ac83b92 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -26,21 +26,22 @@
#include "assert.h"
using namespace boost;
-using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
using namespace qpid::sys;
-SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
- QueueRegistry* _queues,
- ExchangeRegistry* _exchanges,
- AutoDelete* _cleaner,
- const Settings& _settings) :
+namespace qpid {
+namespace broker {
+
+SessionHandlerImpl::SessionHandlerImpl(
+ SessionContext* _context, Broker& broker) :
+
context(_context),
- queues(_queues),
- exchanges(_exchanges),
- cleaner(_cleaner),
- settings(_settings),
+ client(0),
+ queues(broker.getQueues()),
+ exchanges(broker.getExchanges()),
+ cleaner(broker.getCleaner()),
+ settings(broker.getTimeout(), broker.getStagingThreshold()),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -49,10 +50,8 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
txHandler(new TxHandlerImpl(this)),
messageHandler(new MessageHandlerImpl(this)),
framemax(65536),
- heartbeat(0){
-
- client =NULL;
-}
+ heartbeat(0)
+{}
SessionHandlerImpl::~SessionHandlerImpl(){
@@ -75,7 +74,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
queue = getChannel(channel)->getDefaultQueue();
if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
} else {
- queue = queues->find(name);
+ queue = queues.find(name);
if (queue == 0) {
throw ChannelException( 404, "Queue not found: " + name);
}
@@ -85,7 +84,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
- return exchanges->get(name);
+ return exchanges.get(name);
}
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -96,8 +95,10 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
switch(body->type())
{
case REQUEST_BODY:
+ // responder.received(frame);
case RESPONSE_BODY:
- case METHOD_BODY:
+ // requester.received(frame);
+ case METHOD_BODY: //
method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
try{
method->invoke(*this, channel);
@@ -164,7 +165,7 @@ void SessionHandlerImpl::closed(){
}
for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
string name = (*i)->getName();
- queues->destroy(name);
+ queues.destroy(name);
exclusiveQueues.erase(i);
}
} catch(std::exception& e) {
@@ -221,7 +222,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin
parent->channels[channel] = new Channel(
parent->client->getProtocolVersion() , parent->context, channel,
- parent->framemax, parent->queues->getStore(),
+ parent->framemax, parent->queues.getStore(),
parent->settings.stagingThreshold);
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
@@ -251,12 +252,12 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
const FieldTable& /*arguments*/){
if(passive){
- if(!parent->exchanges->get(exchange)){
+ if(!parent->exchanges.get(exchange)){
throw ChannelException(404, "Exchange not found: " + exchange);
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
if(!response.second && response.first->getType() != type){
throw ConnectionException(507, "Exchange already declared to be of type "
+ response.first->getType() + ", requested " + type);
@@ -288,7 +289,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- parent->exchanges->destroy(exchange);
+ parent->exchanges.destroy(exchange);
if(!nowait) parent->client->getExchange().deleteOk(channel);
}
@@ -300,7 +301,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue = parent->getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
+ parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -310,11 +311,11 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue_created.first->create(arguments);
//add default binding:
- parent->exchanges->getDefault()->bind(queue, name, 0);
+ parent->exchanges.getDefault()->bind(queue, name, 0);
if (exclusive) {
parent->exclusiveQueues.push_back(queue);
} else if(autoDelete){
- parent->cleaner->add(queue);
+ parent->cleaner.add(queue);
}
}
}
@@ -332,7 +333,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
const FieldTable& arguments){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
if(exchange){
// kpvdr - cannot use this any longer as routingKey is now const
// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
@@ -369,7 +370,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
}
count = q->getMessageCount();
q->destroy();
- parent->queues->destroy(queue);
+ parent->queues.destroy(queue);
}
if(!nowait) parent->client->getQueue().deleteOk(channel, count);
@@ -424,7 +425,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
- Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
if(exchange){
Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
parent->getChannel(channel)->handlePublish(msg, exchange);
@@ -652,3 +653,4 @@ SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
+}}