From bc84e62cc549ac2d751a45d61a867354c84c60d6 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 16 Jan 2007 20:17:50 +0000 Subject: * Renamed Session* classes to Connection* to align with AMQP spec - broker::SessionHandlerImpl -> broker::Connection - broker::SessionHandlerImplFactory -> broker::ConnectionFactory - sys::SessionHandler -> ConnectionInputHandler - sys::SessionHandlerFactory -> ConnectionInputHandlerFactory git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496848 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/broker/Broker.cpp | 6 +- cpp/lib/broker/Broker.h | 4 +- cpp/lib/broker/Connection.cpp | 700 +++++++++++++++++++++ cpp/lib/broker/Connection.h | 360 +++++++++++ cpp/lib/broker/ConnectionFactory.cpp | 43 ++ cpp/lib/broker/ConnectionFactory.h | 46 ++ cpp/lib/broker/Makefile.am | 8 +- cpp/lib/broker/QueueRegistry.cpp | 1 - cpp/lib/broker/SessionHandlerFactoryImpl.cpp | 43 -- cpp/lib/broker/SessionHandlerFactoryImpl.h | 46 -- cpp/lib/broker/SessionHandlerImpl.cpp | 699 -------------------- cpp/lib/broker/SessionHandlerImpl.h | 381 ----------- cpp/lib/common/Exception.h | 11 + cpp/lib/common/Makefile.am | 4 +- cpp/lib/common/framing/Requester.cpp | 12 +- cpp/lib/common/sys/Acceptor.h | 4 +- cpp/lib/common/sys/ConnectionInputHandler.h | 45 ++ cpp/lib/common/sys/ConnectionInputHandlerFactory.h | 46 ++ cpp/lib/common/sys/SessionHandler.h | 45 -- cpp/lib/common/sys/SessionHandlerFactory.h | 46 -- cpp/lib/common/sys/apr/APRAcceptor.cpp | 6 +- cpp/lib/common/sys/apr/LFSessionContext.cpp | 2 +- cpp/lib/common/sys/apr/LFSessionContext.h | 6 +- cpp/lib/common/sys/posix/EventChannelAcceptor.cpp | 10 +- .../common/sys/posix/EventChannelConnection.cpp | 4 +- cpp/lib/common/sys/posix/EventChannelConnection.h | 10 +- cpp/lib/common/sys/posix/PosixAcceptor.cpp | 2 +- 27 files changed, 1291 insertions(+), 1299 deletions(-) create mode 100644 cpp/lib/broker/Connection.cpp create mode 100644 cpp/lib/broker/Connection.h create mode 100644 cpp/lib/broker/ConnectionFactory.cpp create mode 100644 cpp/lib/broker/ConnectionFactory.h delete mode 100644 cpp/lib/broker/SessionHandlerFactoryImpl.cpp delete mode 100644 cpp/lib/broker/SessionHandlerFactoryImpl.h delete mode 100644 cpp/lib/broker/SessionHandlerImpl.cpp delete mode 100644 cpp/lib/broker/SessionHandlerImpl.h create mode 100644 cpp/lib/common/sys/ConnectionInputHandler.h create mode 100644 cpp/lib/common/sys/ConnectionInputHandlerFactory.h delete mode 100644 cpp/lib/common/sys/SessionHandler.h delete mode 100644 cpp/lib/common/sys/SessionHandlerFactory.h (limited to 'cpp/lib') diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index c2117eaf23..079eb5fd73 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -29,10 +29,10 @@ #include "MessageStoreModule.h" #include "NullMessageStore.h" #include "ProtocolInitiation.h" -#include "SessionHandlerImpl.h" +#include "Connection.h" #include "sys/SessionContext.h" -#include "sys/SessionHandler.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandler.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "sys/TimeoutHandler.h" #include "Broker.h" diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index ad7fbb1eca..929ed4360e 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -23,7 +23,7 @@ */ #include -#include +#include #include #include #include @@ -99,7 +99,7 @@ class Broker : public qpid::sys::Runnable, u_int32_t timeout; u_int64_t stagingThreshold; AutoDelete cleaner; - SessionHandlerFactoryImpl factory; + ConnectionFactory factory; qpid::framing::Requester requester; qpid::framing::Responder responder; }; diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp new file mode 100644 index 0000000000..c391ff6db5 --- /dev/null +++ b/cpp/lib/broker/Connection.cpp @@ -0,0 +1,700 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include + +#include "Connection.h" + +#include "FanOutExchange.h" +#include "HeadersExchange.h" + +#include "Requester.h" +#include "Responder.h" + +using namespace boost; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qpid { +namespace broker { + +Connection::Connection( + SessionContext* _context, Broker& broker) : + + context(_context), + client(0), + queues(broker.getQueues()), + exchanges(broker.getExchanges()), + cleaner(broker.getCleaner()), + settings(broker.getTimeout(), broker.getStagingThreshold()), + requester(broker.getRequester()), + responder(broker.getResponder()), + basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), + exchangeHandler(new ExchangeHandlerImpl(this)), + queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), + messageHandler(new MessageHandlerImpl(this)), + framemax(65536), + heartbeat(0) +{} + +Connection::~Connection(){ + + if (client != NULL) + delete client; + +} + +Channel* Connection::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + throw ConnectionException(504, "Unknown channel: " + channel); + } + return i->second; +} + +Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ + Queue::shared_ptr queue; + if (name.empty()) { + queue = getChannel(channel)->getDefaultQueue(); + if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); + } else { + queue = queues.find(name); + if (queue == 0) { + throw ChannelException( 404, "Queue not found: " + name); + } + } + return queue; +} + + +Exchange::shared_ptr Connection::findExchange(const string& name){ + return exchanges.get(name); +} + +void Connection::handleMethod( + u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) +{ + AMQMethodBody::shared_ptr method = + shared_polymorphic_cast(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client->getChannel().close( + channel, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client->getConnection().close( + 0, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + client->getConnection().close( + 0, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + +void Connection::received(qpid::framing::AMQFrame* frame){ + u_int16_t channel = frame->getChannel(); + AMQBody::shared_ptr body = frame->getBody(); + switch(body->type()) + { + case REQUEST_BODY: + responder.received(AMQRequestBody::getData(body)); + handleMethod(channel, body); + break; + case RESPONSE_BODY: + // Must process responses before marking them received. + handleMethod(channel, body); + requester.processed(AMQResponseBody::getData(body)); + break; + // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. + case METHOD_BODY: + handleMethod(channel, body); + break; + case HEADER_BODY: + handleHeader( + channel, shared_polymorphic_cast(body)); + break; + + case CONTENT_BODY: + handleContent( + channel, shared_polymorphic_cast(body)); + break; + + case HEARTBEAT_BODY: + assert(channel == 0); + handleHeartbeat( + shared_polymorphic_cast(body)); + break; + } +} + +/** + * An OutputHandler that does request/response procssing before + * delgating to another OutputHandler. + */ +Connection::Sender::Sender( + OutputHandler& oh, Requester& req, Responder& resp) + : out(oh), requester(req), responder(resp) +{} + +void Connection::Sender::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + u_int16_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + out.send(frame); +} + +void Connection::initiated(qpid::framing::ProtocolInitiation* header){ + + if (client == 0) + { + client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); + + + std::cout << "---------------" << this << std::endl; + + //send connection start + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); // channel, majour, minor + client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); + } +} + + +void Connection::idleOut(){ + +} + +void Connection::idleIn(){ + +} + +void Connection::closed(){ + try { + for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ + Channel* c = i->second; + channels.erase(i); + c->close(); + delete c; + } + for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ + string name = (*i)->getName(); + queues.destroy(name); + exclusiveQueues.erase(i); + } + } catch(std::exception& e) { + std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; + } +} + +void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ + getChannel(channel)->handleHeader(body); +} + +void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ + getChannel(channel)->handleContent(body); +} + +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + std::cout << "Connection::handleHeartbeat()" << std::endl; +} + +void Connection::ConnectionHandlerImpl::startOk( + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ + parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); +} + +void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} + +void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ + parent->framemax = framemax; + parent->heartbeat = heartbeat; +} + +void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ + string knownhosts; + parent->client->getConnection().openOk(0, knownhosts); +} + +void Connection::ConnectionHandlerImpl::close( + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/) +{ + parent->client->getConnection().closeOk(0); + parent->context->close(); +} + +void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ + parent->context->close(); +} + + + +void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ + + + parent->channels[channel] = new Channel( + parent->client->getProtocolVersion() , parent->context, channel, + parent->framemax, parent->queues.getStore(), + parent->settings.stagingThreshold); + + // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 + parent->client->getChannel().openOk(channel, std::string()/* ID */); +} + +void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} +void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} + +void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/){ + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client->getChannel().closeOk(channel); + } +} + +void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} + + + +void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ + + if(passive){ + if(!parent->exchanges.get(exchange)){ + throw ChannelException(404, "Exchange not found: " + exchange); + } + }else{ + try{ + std::pair response = parent->exchanges.declare(exchange, type); + if(!response.second && response.first->getType() != type){ + throw ConnectionException(507, "Exchange already declared to be of type " + + response.first->getType() + ", requested " + type); + } + }catch(UnknownExchangeTypeException& e){ + throw ConnectionException(503, "Exchange type not implemented: " + type); + } + } + if(!nowait){ + parent->client->getExchange().declareOk(channel); + } +} + + +void Connection::ExchangeHandlerImpl::unbind( + u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const qpid::framing::FieldTable& /*arguments*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + + + +void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ + + //TODO: implement unused + parent->exchanges.destroy(exchange); + if(!nowait) parent->client->getExchange().deleteOk(channel); +} + +void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = parent->getQueue(name, channel); + } else { + std::pair queue_created = + parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + parent->getChannel(channel)->setDefaultQueue(queue); + + //apply settings & create persistent record if required + queue_created.first->create(arguments); + + //add default binding: + parent->exchanges.getDefault()->bind(queue, name, 0); + if (exclusive) { + parent->exclusiveQueues.push_back(queue); + } else if(autoDelete){ + parent->cleaner.add(queue); + } + } + } + if (exclusive && !queue->isExclusiveOwner(parent)) { + throw ChannelException(405, "Cannot grant exclusive access to queue"); + } + if (!nowait) { + string queueName = queue->getName(); + parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + } +} + +void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); + if(exchange){ + // kpvdr - cannot use this any longer as routingKey is now const + // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); + // exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); + if(!nowait) parent->client->getQueue().bindOk(channel); + }else{ + throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + } +} + +void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ + + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + int count = queue->purge(); + if(!nowait) parent->client->getQueue().purgeOk(channel, count); +} + +void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ + ChannelException error(0, ""); + int count(0); + Queue::shared_ptr q = parent->getQueue(queue, channel); + if(ifEmpty && q->getMessageCount() > 0){ + throw ChannelException(406, "Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw ChannelException(406, "Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(parent)){ + queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); + if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); + } + count = q->getMessageCount(); + q->destroy(); + parent->queues.destroy(queue); + } + + if(!nowait) parent->client->getQueue().deleteOk(channel, count); +} + + + + +void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ + //TODO: handle global + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); + parent->client->getBasic().qosOk(channel); +} + +void Connection::BasicHandlerImpl::consume( + u_int16_t channelId, u_int16_t /*ticket*/, + const string& queueName, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait, const FieldTable& fields) +{ + + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + Channel* channel = parent->channels[channelId]; + if(!consumerTag.empty() && channel->exists(consumerTag)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = consumerTag; + channel->consume( + newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); + + if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } + +} + +void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ + parent->getChannel(channel)->cancel(consumerTag); + + if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); +} + +void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate){ + + Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + +void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + + parent->client->getBasic().getEmpty(channelId, clusterId); + } +} + +void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ + try{ + parent->getChannel(channel)->ack(deliveryTag, multiple); + }catch(InvalidAckException& e){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + } +} + +void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} + +void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); +} + +void Connection::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client->getTx().selectOk(channel); +} + +void Connection::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client->getTx().commitOk(channel); +} + +void Connection::TxHandlerImpl::rollback(u_int16_t channel){ + + parent->getChannel(channel)->rollback(); + parent->client->getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} + +void +Connection::QueueHandlerImpl::unbind( + u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const qpid::framing::FieldTable& /*arguments*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +Connection::ChannelHandlerImpl::resume( + u_int16_t /*channel*/, + const string& /*channelId*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +// Message class method handlers +void +Connection::MessageHandlerImpl::append( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*bytes*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + + +void +Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/, + const string& /*destination*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::close( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::get( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/, + u_int64_t /*value*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::open( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/, + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/, + bool /*requeue*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/, + u_int16_t /*code*/, + const string& /*text*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +}} + diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h new file mode 100644 index 0000000000..2f2770b28d --- /dev/null +++ b/cpp/lib/broker/Connection.h @@ -0,0 +1,360 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Connection_ +#define _Connection_ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "Broker.h" +#include "Exception.h" + +namespace qpid { +namespace broker { + +class Settings { + public: + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + + Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} +}; + +class Connection : public qpid::sys::ConnectionInputHandler, + public qpid::framing::AMQP_ServerOperations, + public ConnectionToken +{ + typedef std::map::iterator channel_iterator; + typedef std::vector::iterator queue_iterator; + class Sender : public qpid::framing::OutputHandler { + public: + Sender(qpid::framing::OutputHandler&, + qpid::framing::Requester&, qpid::framing::Responder&); + void send(qpid::framing::AMQFrame* frame); + private: + OutputHandler& out; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; + }; + + qpid::sys::SessionContext* context; + qpid::framing::AMQP_ClientProxy* client; + QueueRegistry& queues; + ExchangeRegistry& exchanges; + AutoDelete& cleaner; + Settings settings; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; + std::auto_ptr basicHandler; + std::auto_ptr channelHandler; + std::auto_ptr connectionHandler; + std::auto_ptr exchangeHandler; + std::auto_ptr queueHandler; + std::auto_ptr txHandler; + std::auto_ptr messageHandler; + + std::map channels; + std::vector exclusiveQueues; + + u_int32_t framemax; + u_int16_t heartbeat; + + void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); + void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + Channel* getChannel(u_int16_t channel); + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for channel if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if no queue specified and channel has not declared one. + */ + Queue::shared_ptr getQueue(const string& name, u_int16_t channel); + + Exchange::shared_ptr findExchange(const string& name); + + public: + Connection(qpid::sys::SessionContext* context, Broker& broker); + virtual void received(qpid::framing::AMQFrame* frame); + virtual void initiated(qpid::framing::ProtocolInitiation* header); + virtual void idleOut(); + virtual void idleIn(); + virtual void closed(); + virtual ~Connection(); + + class ConnectionHandlerImpl : public ConnectionHandler{ + Connection* parent; + public: + inline ConnectionHandlerImpl(Connection* _parent) : parent(_parent) {} + + virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, + const string& response, const string& locale); + + virtual void secureOk(u_int16_t channel, const string& response); + + virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); + + virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); + + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, + u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ConnectionHandlerImpl(){} + }; + + class ChannelHandlerImpl : public ChannelHandler{ + Connection* parent; + public: + inline ChannelHandlerImpl(Connection* _parent) : parent(_parent) {} + + virtual void open(u_int16_t channel, const string& outOfBand); + + virtual void flow(u_int16_t channel, bool active); + + virtual void flowOk(u_int16_t channel, bool active); + + virtual void ok( u_int16_t channel ); + + virtual void ping( u_int16_t channel ); + + virtual void pong( u_int16_t channel ); + + virtual void resume( u_int16_t channel, + const string& channelId ); + + virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, + u_int16_t classId, u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ChannelHandlerImpl(){} + }; + + class ExchangeHandlerImpl : public ExchangeHandler{ + Connection* parent; + public: + inline ExchangeHandlerImpl(Connection* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, + bool passive, bool durable, bool autoDelete, bool internal, bool nowait, + const qpid::framing::FieldTable& arguments); + + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); + + virtual void unbind(u_int16_t channel, + u_int16_t ticket, + const string& queue, + const string& exchange, + const string& routingKey, + const qpid::framing::FieldTable& arguments ); + + virtual ~ExchangeHandlerImpl(){} + }; + + + class QueueHandlerImpl : public QueueHandler{ + Connection* parent; + public: + inline QueueHandlerImpl(Connection* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); + + virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, + const string& exchange, const string& routingKey, bool nowait, + const qpid::framing::FieldTable& arguments); + + virtual void unbind(u_int16_t channel, + u_int16_t ticket, + const string& queue, + const string& exchange, + const string& routingKey, + const qpid::framing::FieldTable& arguments ); + + virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, + bool nowait); + + virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, + bool nowait); + + virtual ~QueueHandlerImpl(){} + }; + + class BasicHandlerImpl : public BasicHandler{ + Connection* parent; + public: + inline BasicHandlerImpl(Connection* _parent) : parent(_parent) {} + + virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); + + virtual void consume( + u_int16_t channel, u_int16_t ticket, const string& queue, + const string& consumerTag, bool noLocal, bool noAck, + bool exclusive, bool nowait, + const qpid::framing::FieldTable& fields); + + virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); + + virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, + bool mandatory, bool immediate); + + virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); + + virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + + virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + + virtual void recover(u_int16_t channel, bool requeue); + + virtual ~BasicHandlerImpl(){} + }; + + class TxHandlerImpl : public TxHandler{ + Connection* parent; + public: + TxHandlerImpl(Connection* _parent) : parent(_parent) {} + virtual ~TxHandlerImpl() {} + virtual void select(u_int16_t channel); + virtual void commit(u_int16_t channel); + virtual void rollback(u_int16_t channel); + }; + + class MessageHandlerImpl : public MessageHandler { + Connection* parent; + + // Constructors and destructors + + public: + MessageHandlerImpl() {} + MessageHandlerImpl(Connection* _parent) : parent(_parent) {} + virtual ~MessageHandlerImpl() {} + + // Protocol methods + virtual void append( u_int16_t channel, + const string& reference, + const string& bytes ); + + virtual void cancel( u_int16_t channel, + const string& destination ); + + virtual void checkpoint( u_int16_t channel, + const string& reference, + const string& identifier ); + + virtual void close( u_int16_t channel, + const string& reference ); + + virtual void consume( u_int16_t channel, + u_int16_t ticket, + const string& queue, + const string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ); + + virtual void empty( u_int16_t channel ); + + virtual void get( u_int16_t channel, + u_int16_t ticket, + const string& queue, + const string& destination, + bool noAck ); + + virtual void offset( u_int16_t channel, + u_int64_t value ); + + virtual void ok( u_int16_t channel ); + + virtual void open( u_int16_t channel, + const string& reference ); + + virtual void qos( u_int16_t channel, + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool global ); + + virtual void recover( u_int16_t channel, + bool requeue ); + + virtual void reject( u_int16_t channel, + u_int16_t code, + const string& text ); + + virtual void resume( u_int16_t channel, + const string& reference, + const string& identifier ); + + virtual void transfer( u_int16_t channel, + u_int16_t ticket, + const string& destination, + bool redelivered, + bool immediate, + u_int64_t ttl, + u_int8_t priority, + u_int64_t timestamp, + u_int8_t deliveryMode, + u_int64_t expiration, + const string& exchange, + const string& routingKey, + const string& messageId, + const string& correlationId, + const string& replyTo, + const string& contentType, + const string& contentEncoding, + const string& userId, + const string& appId, + const string& transactionId, + const string& securityToken, + const qpid::framing::FieldTable& applicationHeaders, + qpid::framing::Content body ); + }; + + virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } + virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } + virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } + virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } + virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } + virtual TxHandler* getTxHandler(){ return txHandler.get(); } + virtual MessageHandler* getMessageHandler(){ return messageHandler.get(); } + + virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); } + virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } + virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } + virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } + virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } +}; + +}} + +#endif diff --git a/cpp/lib/broker/ConnectionFactory.cpp b/cpp/lib/broker/ConnectionFactory.cpp new file mode 100644 index 0000000000..3c4c7cd724 --- /dev/null +++ b/cpp/lib/broker/ConnectionFactory.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include + +namespace qpid { +namespace broker { + + +ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) +{} + + +ConnectionFactory::~ConnectionFactory() +{ + broker.getCleaner().stop(); +} + +qpid::sys::ConnectionInputHandler* +ConnectionFactory::create(qpid::sys::SessionContext* ctxt) +{ + return new Connection(ctxt, broker); +} + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/ConnectionFactory.h b/cpp/lib/broker/ConnectionFactory.h new file mode 100644 index 0000000000..fe8052ed9c --- /dev/null +++ b/cpp/lib/broker/ConnectionFactory.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionFactory_ +#define _ConnectionFactory_ + +#include "ConnectionInputHandlerFactory.h" + +namespace qpid { +namespace broker { +class Broker; + +class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory +{ + public: + ConnectionFactory(Broker& b); + + virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext* ctxt); + + virtual ~ConnectionFactory(); + + private: + Broker& broker; +}; + +}} + + +#endif diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 01a5b3d847..b1a0c1af78 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -65,10 +65,10 @@ libqpidbroker_la_SOURCES = \ QueueRegistry.h \ RecoveryManager.cpp \ RecoveryManager.h \ - SessionHandlerFactoryImpl.cpp \ - SessionHandlerFactoryImpl.h \ - SessionHandlerImpl.cpp \ - SessionHandlerImpl.h \ + ConnectionFactory.cpp \ + ConnectionFactory.h \ + Connection.cpp \ + Connection.h \ TopicExchange.cpp \ TopicExchange.h \ TransactionalStore.h \ diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp index 2d1382ef09..ff2f83b725 100644 --- a/cpp/lib/broker/QueueRegistry.cpp +++ b/cpp/lib/broker/QueueRegistry.cpp @@ -19,7 +19,6 @@ * */ #include -#include #include #include diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp deleted file mode 100644 index 559fd6bca1..0000000000 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include - -namespace qpid { -namespace broker { - - -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b) -{} - - -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() -{ - broker.getCleaner().stop(); -} - -qpid::sys::SessionHandler* -SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt) -{ - return new SessionHandlerImpl(ctxt, broker); -} - -}} // namespace qpid::broker diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h deleted file mode 100644 index 49c42b4d1c..0000000000 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandlerFactoryImpl_ -#define _SessionHandlerFactoryImpl_ - -#include "SessionHandlerFactory.h" - -namespace qpid { -namespace broker { -class Broker; - -class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory -{ - public: - SessionHandlerFactoryImpl(Broker& b); - - virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); - - virtual ~SessionHandlerFactoryImpl(); - - private: - Broker& broker; -}; - -}} - - -#endif diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp deleted file mode 100644 index d7f6320535..0000000000 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ /dev/null @@ -1,699 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include - -#include "SessionHandlerImpl.h" - -#include "FanOutExchange.h" -#include "HeadersExchange.h" - -#include "Requester.h" -#include "Responder.h" - -using namespace boost; -using namespace qpid::sys; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid { -namespace broker { - -SessionHandlerImpl::SessionHandlerImpl( - SessionContext* _context, Broker& broker) : - - context(_context), - client(0), - queues(broker.getQueues()), - exchanges(broker.getExchanges()), - cleaner(broker.getCleaner()), - settings(broker.getTimeout(), broker.getStagingThreshold()), - requester(broker.getRequester()), - responder(broker.getResponder()), - basicHandler(new BasicHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), - connectionHandler(new ConnectionHandlerImpl(this)), - exchangeHandler(new ExchangeHandlerImpl(this)), - queueHandler(new QueueHandlerImpl(this)), - txHandler(new TxHandlerImpl(this)), - messageHandler(new MessageHandlerImpl(this)), - framemax(65536), - heartbeat(0) -{} - -SessionHandlerImpl::~SessionHandlerImpl(){ - - if (client != NULL) - delete client; - -} - -Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ - channel_iterator i = channels.find(channel); - if(i == channels.end()){ - throw ConnectionException(504, "Unknown channel: " + channel); - } - return i->second; -} - -Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ - Queue::shared_ptr queue; - if (name.empty()) { - queue = getChannel(channel)->getDefaultQueue(); - if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); - } else { - queue = queues.find(name); - if (queue == 0) { - throw ChannelException( 404, "Queue not found: " + name); - } - } - return queue; -} - - -Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ - return exchanges.get(name); -} - -void SessionHandlerImpl::handleMethod( - u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) -{ - AMQMethodBody::shared_ptr method = - shared_polymorphic_cast(body); - try{ - method->invoke(*this, channel); - }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); - client->getChannel().close( - channel, e.code, e.text, - method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client->getConnection().close( - 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - client->getConnection().close( - 0, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} - -void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ - u_int16_t channel = frame->getChannel(); - AMQBody::shared_ptr body = frame->getBody(); - switch(body->type()) - { - case REQUEST_BODY: - responder.received(AMQRequestBody::getData(body)); - handleMethod(channel, body); - break; - case RESPONSE_BODY: - // Must process responses before marking them received. - handleMethod(channel, body); - requester.processed(AMQResponseBody::getData(body)); - break; - // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. - case METHOD_BODY: - handleMethod(channel, body); - break; - case HEADER_BODY: - handleHeader( - channel, shared_polymorphic_cast(body)); - break; - - case CONTENT_BODY: - handleContent( - channel, shared_polymorphic_cast(body)); - break; - - case HEARTBEAT_BODY: - assert(channel == 0); - handleHeartbeat( - shared_polymorphic_cast(body)); - break; - } -} - -/** - * An OutputHandler that does request/response procssing before - * delgating to another OutputHandler. - */ -SessionHandlerImpl::Sender::Sender( - OutputHandler& oh, Requester& req, Responder& resp) - : out(oh), requester(req), responder(resp) -{} - -void SessionHandlerImpl::Sender::send(AMQFrame* frame) { - AMQBody::shared_ptr body = frame->getBody(); - u_int16_t type = body->type(); - if (type == REQUEST_BODY) - requester.sending(AMQRequestBody::getData(body)); - else if (type == RESPONSE_BODY) - responder.sending(AMQResponseBody::getData(body)); - out.send(frame); -} - -void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ - - if (client == 0) - { - client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); - - - std::cout << "---------------" << this << std::endl; - - //send connection start - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); // channel, majour, minor - client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); - } -} - - -void SessionHandlerImpl::idleOut(){ - -} - -void SessionHandlerImpl::idleIn(){ - -} - -void SessionHandlerImpl::closed(){ - try { - for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ - Channel* c = i->second; - channels.erase(i); - c->close(); - delete c; - } - for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ - string name = (*i)->getName(); - queues.destroy(name); - exclusiveQueues.erase(i); - } - } catch(std::exception& e) { - std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; - } -} - -void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body); -} - -void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body); -} - -void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; -} - -void SessionHandlerImpl::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} - -void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ - parent->framemax = framemax; - parent->heartbeat = heartbeat; -} - -void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ - string knownhosts; - parent->client->getConnection().openOk(0, knownhosts); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/) -{ - parent->client->getConnection().closeOk(0); - parent->context->close(); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ - parent->context->close(); -} - - - -void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - - - parent->channels[channel] = new Channel( - parent->client->getProtocolVersion() , parent->context, channel, - parent->framemax, parent->queues.getStore(), - parent->settings.stagingThreshold); - - // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - parent->client->getChannel().openOk(channel, std::string()/* ID */); -} - -void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} - -void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/){ - Channel* c = parent->getChannel(channel); - if(c){ - parent->channels.erase(channel); - c->close(); - delete c; - parent->client->getChannel().closeOk(channel); - } -} - -void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} - - - -void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ - - if(passive){ - if(!parent->exchanges.get(exchange)){ - throw ChannelException(404, "Exchange not found: " + exchange); - } - }else{ - try{ - std::pair response = parent->exchanges.declare(exchange, type); - if(!response.second && response.first->getType() != type){ - throw ConnectionException(507, "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); - } - }catch(UnknownExchangeTypeException& e){ - throw ConnectionException(503, "Exchange type not implemented: " + type); - } - } - if(!nowait){ - parent->client->getExchange().declareOk(channel); - } -} - - -void SessionHandlerImpl::ExchangeHandlerImpl::unbind( - u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const qpid::framing::FieldTable& /*arguments*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - - - -void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ - - //TODO: implement unused - parent->exchanges.destroy(exchange); - if(!nowait) parent->client->getExchange().deleteOk(channel); -} - -void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = parent->getQueue(name, channel); - } else { - std::pair queue_created = - parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - parent->getChannel(channel)->setDefaultQueue(queue); - - //apply settings & create persistent record if required - queue_created.first->create(arguments); - - //add default binding: - parent->exchanges.getDefault()->bind(queue, name, 0); - if (exclusive) { - parent->exclusiveQueues.push_back(queue); - } else if(autoDelete){ - parent->cleaner.add(queue); - } - } - } - if (exclusive && !queue->isExclusiveOwner(parent)) { - throw ChannelException(405, "Cannot grant exclusive access to queue"); - } - if (!nowait) { - string queueName = queue->getName(); - parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); - } -} - -void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); - if(exchange){ - // kpvdr - cannot use this any longer as routingKey is now const - // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - // exchange->bind(queue, routingKey, &arguments); - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) parent->client->getQueue().bindOk(channel); - }else{ - throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - int count = queue->purge(); - if(!nowait) parent->client->getQueue().purgeOk(channel, count); -} - -void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = parent->getQueue(queue, channel); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(parent)){ - queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); - if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - q->destroy(); - parent->queues.destroy(queue); - } - - if(!nowait) parent->client->getQueue().deleteOk(channel, count); -} - - - - -void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ - //TODO: handle global - parent->getChannel(channel)->setPrefetchSize(prefetchSize); - parent->getChannel(channel)->setPrefetchCount(prefetchCount); - parent->client->getBasic().qosOk(channel); -} - -void SessionHandlerImpl::BasicHandlerImpl::consume( - u_int16_t channelId, u_int16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) -{ - - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - Channel* channel = parent->channels[channelId]; - if(!consumerTag.empty() && channel->exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = consumerTag; - channel->consume( - newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); - - if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - -} - -void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ - parent->getChannel(channel)->cancel(consumerTag); - - if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); -} - -void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate){ - - Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); - if(exchange){ - Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg, exchange); - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - if(!parent->getChannel(channelId)->get(queue, !noAck)){ - string clusterId;//not used, part of an imatix hack - - parent->client->getBasic().getEmpty(channelId, clusterId); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ - try{ - parent->getChannel(channel)->ack(deliveryTag, multiple); - }catch(InvalidAckException& e){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} - -void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ - parent->getChannel(channel)->recover(requeue); -} - -void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ - parent->getChannel(channel)->begin(); - parent->client->getTx().selectOk(channel); -} - -void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ - parent->getChannel(channel)->commit(); - parent->client->getTx().commitOk(channel); -} - -void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ - - parent->getChannel(channel)->rollback(); - parent->client->getTx().rollbackOk(channel); - parent->getChannel(channel)->recover(false); -} - -void -SessionHandlerImpl::QueueHandlerImpl::unbind( - u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const qpid::framing::FieldTable& /*arguments*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -SessionHandlerImpl::ChannelHandlerImpl::resume( - u_int16_t /*channel*/, - const string& /*channelId*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -// Message class method handlers -void -SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -}} - diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h deleted file mode 100644 index 070bd1266e..0000000000 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ /dev/null @@ -1,381 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandlerImpl_ -#define _SessionHandlerImpl_ - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include "Broker.h" -#include "Exception.h" - -namespace qpid { -namespace broker { - -struct ChannelException : public qpid::Exception { - u_int16_t code; - string text; - ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} - ~ChannelException() throw() {} - const char* what() const throw() { return text.c_str(); } -}; - -struct ConnectionException : public qpid::Exception { - u_int16_t code; - string text; - ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} - ~ConnectionException() throw() {} - const char* what() const throw() { return text.c_str(); } -}; - -class Settings { - public: - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - const u_int64_t stagingThreshold; - - Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} -}; - -class SessionHandlerImpl : public qpid::sys::SessionHandler, - public qpid::framing::AMQP_ServerOperations, - public ConnectionToken -{ - typedef std::map::iterator channel_iterator; - typedef std::vector::iterator queue_iterator; - class Sender : public qpid::framing::OutputHandler { - public: - Sender(qpid::framing::OutputHandler&, - qpid::framing::Requester&, qpid::framing::Responder&); - void send(qpid::framing::AMQFrame* frame); - private: - OutputHandler& out; - qpid::framing::Requester& requester; - qpid::framing::Responder& responder; - }; - - qpid::sys::SessionContext* context; - qpid::framing::AMQP_ClientProxy* client; - QueueRegistry& queues; - ExchangeRegistry& exchanges; - AutoDelete& cleaner; - Settings settings; - qpid::framing::Requester& requester; - qpid::framing::Responder& responder; - std::auto_ptr basicHandler; - std::auto_ptr channelHandler; - std::auto_ptr connectionHandler; - std::auto_ptr exchangeHandler; - std::auto_ptr queueHandler; - std::auto_ptr txHandler; - std::auto_ptr messageHandler; - - std::map channels; - std::vector exclusiveQueues; - - u_int32_t framemax; - u_int16_t heartbeat; - - void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); - void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); - void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body); - void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - Channel* getChannel(u_int16_t channel); - /** - * Get named queue, never returns 0. - * @return: named queue or default queue for channel if name="" - * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if no queue specified and channel has not declared one. - */ - Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - - Exchange::shared_ptr findExchange(const string& name); - - public: - SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker); - virtual void received(qpid::framing::AMQFrame* frame); - virtual void initiated(qpid::framing::ProtocolInitiation* header); - virtual void idleOut(); - virtual void idleIn(); - virtual void closed(); - virtual ~SessionHandlerImpl(); - - class ConnectionHandlerImpl : public ConnectionHandler{ - SessionHandlerImpl* parent; - public: - inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, - const string& response, const string& locale); - - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 - virtual void secureOk(u_int16_t channel, const string& response); - - virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 - virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); - - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 - virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, - u_int16_t methodId); - - virtual void closeOk(u_int16_t channel); - - virtual ~ConnectionHandlerImpl(){} - }; - - class ChannelHandlerImpl : public ChannelHandler{ - SessionHandlerImpl* parent; - public: - inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 - virtual void open(u_int16_t channel, const string& outOfBand); - - virtual void flow(u_int16_t channel, bool active); - - virtual void flowOk(u_int16_t channel, bool active); - - virtual void ok( u_int16_t channel ); - - virtual void ping( u_int16_t channel ); - - virtual void pong( u_int16_t channel ); - - virtual void resume( u_int16_t channel, - const string& channelId ); - - virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, - u_int16_t classId, u_int16_t methodId); - - virtual void closeOk(u_int16_t channel); - - virtual ~ChannelHandlerImpl(){} - }; - - class ExchangeHandlerImpl : public ExchangeHandler{ - SessionHandlerImpl* parent; - public: - inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, - bool passive, bool durable, bool autoDelete, bool internal, bool nowait, - const qpid::framing::FieldTable& arguments); - - virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); - - virtual void unbind(u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& exchange, - const string& routingKey, - const qpid::framing::FieldTable& arguments ); - - virtual ~ExchangeHandlerImpl(){} - }; - - - class QueueHandlerImpl : public QueueHandler{ - SessionHandlerImpl* parent; - public: - inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); - - virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, - const string& exchange, const string& routingKey, bool nowait, - const qpid::framing::FieldTable& arguments); - - virtual void unbind(u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& exchange, - const string& routingKey, - const qpid::framing::FieldTable& arguments ); - - virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, - bool nowait); - - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 - virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, - bool nowait); - - virtual ~QueueHandlerImpl(){} - }; - - class BasicHandlerImpl : public BasicHandler{ - SessionHandlerImpl* parent; - public: - inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); - - virtual void consume( - u_int16_t channel, u_int16_t ticket, const string& queue, - const string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - - virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); - - virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, - bool mandatory, bool immediate); - - virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); - - virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); - - virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); - - virtual void recover(u_int16_t channel, bool requeue); - - virtual ~BasicHandlerImpl(){} - }; - - class TxHandlerImpl : public TxHandler{ - SessionHandlerImpl* parent; - public: - TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - virtual ~TxHandlerImpl() {} - virtual void select(u_int16_t channel); - virtual void commit(u_int16_t channel); - virtual void rollback(u_int16_t channel); - }; - - class MessageHandlerImpl : public MessageHandler { - SessionHandlerImpl* parent; - - // Constructors and destructors - - public: - MessageHandlerImpl() {} - MessageHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - virtual ~MessageHandlerImpl() {} - - // Protocol methods - virtual void append( u_int16_t channel, - const string& reference, - const string& bytes ); - - virtual void cancel( u_int16_t channel, - const string& destination ); - - virtual void checkpoint( u_int16_t channel, - const string& reference, - const string& identifier ); - - virtual void close( u_int16_t channel, - const string& reference ); - - virtual void consume( u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ); - - virtual void empty( u_int16_t channel ); - - virtual void get( u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& destination, - bool noAck ); - - virtual void offset( u_int16_t channel, - u_int64_t value ); - - virtual void ok( u_int16_t channel ); - - virtual void open( u_int16_t channel, - const string& reference ); - - virtual void qos( u_int16_t channel, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool global ); - - virtual void recover( u_int16_t channel, - bool requeue ); - - virtual void reject( u_int16_t channel, - u_int16_t code, - const string& text ); - - virtual void resume( u_int16_t channel, - const string& reference, - const string& identifier ); - - virtual void transfer( u_int16_t channel, - u_int16_t ticket, - const string& destination, - bool redelivered, - bool immediate, - u_int64_t ttl, - u_int8_t priority, - u_int64_t timestamp, - u_int8_t deliveryMode, - u_int64_t expiration, - const string& exchange, - const string& routingKey, - const string& messageId, - const string& correlationId, - const string& replyTo, - const string& contentType, - const string& contentEncoding, - const string& userId, - const string& appId, - const string& transactionId, - const string& securityToken, - const qpid::framing::FieldTable& applicationHeaders, - qpid::framing::Content body ); - }; - - virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } - virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } - virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } - virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } - virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } - virtual TxHandler* getTxHandler(){ return txHandler.get(); } - virtual MessageHandler* getMessageHandler(){ return messageHandler.get(); } - - virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); } - virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } - virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } - virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } - virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } -}; - -}} - -#endif diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index f35d427bb0..61bbc0ab5f 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -54,6 +54,17 @@ class Exception : public std::exception typedef boost::shared_ptr shared_ptr; }; +struct ChannelException : public qpid::Exception { + u_int16_t code; + ChannelException(u_int16_t _code, std::string _text) + : Exception(_text), code(_code) {} +}; + +struct ConnectionException : public qpid::Exception { + u_int16_t code; + ConnectionException(u_int16_t _code, std::string _text) + : Exception(_text), code(_code) {} +}; } diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index 541145ac97..813c49135e 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -120,8 +120,8 @@ nobase_pkginclude_HEADERS = \ sys/Mutex.h \ sys/Runnable.h \ sys/SessionContext.h \ - sys/SessionHandler.h \ - sys/SessionHandlerFactory.h \ + sys/ConnectionInputHandler.h \ + sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ sys/Socket.h \ sys/Thread.h \ diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp index 1dd3cd4ce9..7e1da505c6 100644 --- a/cpp/lib/common/framing/Requester.cpp +++ b/cpp/lib/common/framing/Requester.cpp @@ -33,13 +33,15 @@ void Requester::sending(AMQRequestBody::Data& request) { void Requester::processed(const AMQResponseBody::Data& response) { responseMark = response.responseId; RequestId id = response.requestId; - RequestId end = id + response.batchOffset; + RequestId end = id + response.batchOffset + 1; for ( ; id < end; ++id) { std::set::iterator i = requests.find(id); - if (i == requests.end()) - // TODO aconway 2007-01-12: Verify this is the right exception. - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); - requests.erase(i); + if (i != requests.end()) + requests.erase(i); + else { + // FIXME aconway 2007-01-16: Uncomment exception when ids are propagating. +// THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); + } } } diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h index e6bc27a593..f571dcbddd 100644 --- a/cpp/lib/common/sys/Acceptor.h +++ b/cpp/lib/common/sys/Acceptor.h @@ -28,7 +28,7 @@ namespace qpid { namespace sys { -class SessionHandlerFactory; +class ConnectionInputHandlerFactory; class Acceptor : public qpid::SharedObject { @@ -36,7 +36,7 @@ class Acceptor : public qpid::SharedObject static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); virtual ~Acceptor() = 0; virtual int16_t getPort() const = 0; - virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/lib/common/sys/ConnectionInputHandler.h b/cpp/lib/common/sys/ConnectionInputHandler.h new file mode 100644 index 0000000000..fa70dfaf48 --- /dev/null +++ b/cpp/lib/common/sys/ConnectionInputHandler.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionInputHandler_ +#define _ConnectionInputHandler_ + +#include +#include +#include +#include + +namespace qpid { +namespace sys { + + class ConnectionInputHandler : + public qpid::framing::InitiationHandler, + public qpid::framing::InputHandler, + public TimeoutHandler + { + public: + virtual void closed() = 0; + }; + +} +} + + +#endif diff --git a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h new file mode 100644 index 0000000000..5bb5e17704 --- /dev/null +++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionInputHandlerFactory_ +#define _ConnectionInputHandlerFactory_ + +#include + +namespace qpid { +namespace sys { + +class SessionContext; +class ConnectionInputHandler; + +/** + * Callback interface used by the Acceptor to + * create a ConnectionInputHandler for each new connection. + */ +class ConnectionInputHandlerFactory : private boost::noncopyable +{ + public: + virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0; + virtual ~ConnectionInputHandlerFactory(){} +}; + +}} + + +#endif diff --git a/cpp/lib/common/sys/SessionHandler.h b/cpp/lib/common/sys/SessionHandler.h deleted file mode 100644 index 76f79d421d..0000000000 --- a/cpp/lib/common/sys/SessionHandler.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandler_ -#define _SessionHandler_ - -#include -#include -#include -#include - -namespace qpid { -namespace sys { - - class SessionHandler : - public qpid::framing::InitiationHandler, - public qpid::framing::InputHandler, - public TimeoutHandler - { - public: - virtual void closed() = 0; - }; - -} -} - - -#endif diff --git a/cpp/lib/common/sys/SessionHandlerFactory.h b/cpp/lib/common/sys/SessionHandlerFactory.h deleted file mode 100644 index 2a01aebcb0..0000000000 --- a/cpp/lib/common/sys/SessionHandlerFactory.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandlerFactory_ -#define _SessionHandlerFactory_ - -#include - -namespace qpid { -namespace sys { - -class SessionContext; -class SessionHandler; - -/** - * Callback interface used by the Acceptor to - * create a SessionHandler for each new connection. - */ -class SessionHandlerFactory : private boost::noncopyable -{ - public: - virtual SessionHandler* create(SessionContext* ctxt) = 0; - virtual ~SessionHandlerFactory(){} -}; - -}} - - -#endif diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 6853833797..10f787f4fe 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -19,7 +19,7 @@ * */ #include -#include +#include #include "LFProcessor.h" #include "LFSessionContext.h" #include "APRBase.h" @@ -33,7 +33,7 @@ class APRAcceptor : public Acceptor public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); virtual int16_t getPort() const; - virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); virtual void shutdown(); private: @@ -75,7 +75,7 @@ int16_t APRAcceptor::getPort() const { return address->port; } -void APRAcceptor::run(SessionHandlerFactory* factory) { +void APRAcceptor::run(ConnectionInputHandlerFactory* factory) { running = true; processor.start(); std::cout << "Listening on port " << getPort() << "..." << std::endl; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp index 7fb8d5a91b..43fc3de3dd 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -160,7 +160,7 @@ void LFSessionContext::shutdown(){ handleClose(); } -void LFSessionContext::init(SessionHandler* _handler){ +void LFSessionContext::init(ConnectionInputHandler* _handler){ handler = _handler; processor->add(&fd); } diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index 9483cbe590..8cf50b87ba 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -31,7 +31,7 @@ #include #include #include -#include +#include #include "APRSocket.h" #include "LFProcessor.h" @@ -49,7 +49,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext qpid::framing::Buffer in; qpid::framing::Buffer out; - qpid::sys::SessionHandler* handler; + qpid::sys::ConnectionInputHandler* handler; LFProcessor* const processor; apr_pollfd_t fd; @@ -74,7 +74,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext virtual void close(); void read(); void write(); - void init(qpid::sys::SessionHandler* handler); + void init(qpid::sys::ConnectionInputHandler* handler); void startProcessing(); void stopProcessing(); void handleClose(); diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 7cd6f60902..787d12d6d1 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -27,8 +27,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -53,7 +53,7 @@ class EventChannelAcceptor : public Acceptor { int getPort() const; - void run(SessionHandlerFactory& factory); + void run(ConnectionInputHandlerFactory& factory); void shutdown(); @@ -68,7 +68,7 @@ class EventChannelAcceptor : public Acceptor { bool isRunning; boost::ptr_vector connections; AcceptEvent acceptEvent; - SessionHandlerFactory* factory; + ConnectionInputHandlerFactory* factory; bool isShutdown; EventChannelThreads::shared_ptr threads; }; @@ -100,7 +100,7 @@ int EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } -void EventChannelAcceptor::run(SessionHandlerFactory& f) { +void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) { { Mutex::ScopedLock l(lock); if (!isRunning && !isShutdown) { diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp index 196dde5af8..4449dc3035 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp +++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp @@ -22,7 +22,7 @@ #include #include "EventChannelConnection.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "QpidError.h" using namespace std; @@ -36,7 +36,7 @@ const size_t EventChannelConnection::bufferSize = 65536; EventChannelConnection::EventChannelConnection( EventChannelThreads::shared_ptr threads_, - SessionHandlerFactory& factory_, + ConnectionInputHandlerFactory& factory_, int rfd, int wfd, bool isTrace_ diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h index bace045993..1504e92651 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.h +++ b/cpp/lib/common/sys/posix/EventChannelConnection.h @@ -24,17 +24,17 @@ #include "EventChannelThreads.h" #include "sys/Monitor.h" #include "sys/SessionContext.h" -#include "sys/SessionHandler.h" +#include "sys/ConnectionInputHandler.h" #include "sys/AtomicCount.h" #include "framing/AMQFrame.h" namespace qpid { namespace sys { -class SessionHandlerFactory; +class ConnectionInputHandlerFactory; /** - * Implements SessionContext and delegates to a SessionHandler + * Implements SessionContext and delegates to a ConnectionInputHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, @@ -44,7 +44,7 @@ class EventChannelConnection : public SessionContext { public: EventChannelConnection( EventChannelThreads::shared_ptr threads, - SessionHandlerFactory& factory, + ConnectionInputHandlerFactory& factory, int readDescriptor, int writeDescriptor = 0, bool isTrace = false @@ -86,7 +86,7 @@ class EventChannelConnection : public SessionContext { AtomicCount busyThreads; EventChannelThreads::shared_ptr threads; - std::auto_ptr handler; + std::auto_ptr handler; qpid::framing::Buffer in, out; FrameQueue writeFrames; bool isTrace; diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp index 842aa76f36..a80a6c61f7 100644 --- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp +++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -32,7 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } class PosixAcceptor : public Acceptor { public: virtual int16_t getPort() const { fail(); return 0; } - virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } + virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } virtual void shutdown() { fail(); } }; -- cgit v1.2.1