From ae143d9fc55304eb6fa9660938fb4069bab57c03 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 17 Apr 2008 00:19:14 +0000 Subject: Refactored IO Thread creation so that it happens in the Broker class - There is now a single Poller created by the Broker class that is passed to the Acceptor for use in network IO. It can also now be passed to anything else that wants to put work in the IO threads - The Broker class itself is now responsible for actually creating the threads git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@648904 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Broker.cpp | 34 ++++++++++++++--- qpid/cpp/src/qpid/broker/Broker.h | 5 +++ qpid/cpp/src/qpid/sys/Acceptor.h | 14 ++++--- qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 60 +++++++++--------------------- 4 files changed, 58 insertions(+), 55 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 689fb0687c..9773b49b93 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -53,6 +56,9 @@ #endif using qpid::sys::Acceptor; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; @@ -121,6 +127,7 @@ const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : + poller(new Poller), config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), @@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store) } void Broker::run() { - getAcceptor().run(&factory); + + getAcceptor().run(poller, &factory); + + Dispatcher d(poller); + int numIOThreads = config.workerThreads; + std::vector t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; ishutdown(); + poller->shutdown(); } Broker::~Broker() { @@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const { if (!acceptor) { const_cast(acceptor) = Acceptor::create(config.port, - config.connectionBacklog, - config.workerThreads); + config.connectionBacklog); QPID_LOG(info, "Listening on port " << getPort()); } return *acceptor; @@ -330,7 +352,7 @@ void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(poller, host, port, f ? f : &factory); } void Broker::connect( diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index ea29348c16..49c4b203c4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -50,6 +50,10 @@ namespace qpid { +namespace sys { + class Poller; +} + class Url; namespace broker { @@ -129,6 +133,7 @@ class Broker : public sys::Runnable, public Plugin::Target, private: sys::Acceptor& getAcceptor() const; + boost::shared_ptr poller; Options config; sys::Acceptor::shared_ptr acceptor; MessageStore* store; diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/sys/Acceptor.h index 1e7827e60c..243e791eeb 100644 --- a/qpid/cpp/src/qpid/sys/Acceptor.h +++ b/qpid/cpp/src/qpid/sys/Acceptor.h @@ -26,22 +26,24 @@ #include "qpid/SharedObject.h" #include "ConnectionCodec.h" + namespace qpid { namespace sys { +class Poller; + class Acceptor : public qpid::SharedObject { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads); + static Acceptor::shared_ptr create(int16_t port, int backlog); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(ConnectionCodec::Factory*) = 0; + virtual void run(boost::shared_ptr, ConnectionCodec::Factory*) = 0; virtual void connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; - - /** Note: this function is async-signal safe */ - virtual void shutdown() = 0; + boost::shared_ptr, + const std::string& host, int16_t port, + ConnectionCodec::Factory* codec) = 0; }; inline Acceptor::~Acceptor() {} diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 43fbfdf7be..5133fde183 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -43,19 +43,15 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; Socket listener; - int numIOThreads; const uint16_t listeningPort; + std::auto_ptr acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog, int threads); - ~AsynchIOAcceptor() {} - void run(ConnectionCodec::Factory*); - void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); + AsynchIOAcceptor(int16_t port, int backlog); + void run(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); - void shutdown(); - uint16_t getPort() const; std::string getHost() const; @@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor { void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) { - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); } -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : + listeningPort(listener.listen(port, backlog)), + acceptor(0) {} // Buffer definition @@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); - - std::vector t(numIOThreads-1); - - // Run n-1 io threads - for (int i=0; istart(poller); } void AsynchIOAcceptor::connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* f) + Poller::shared_ptr poller, + const std::string& host, int16_t port, + ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect( aio->start(poller); } - -void AsynchIOAcceptor::shutdown() { - // NB: this function must be async-signal safe, it must not - // call any function that is not async-signal safe. - poller->shutdown(); -} - - void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); -- cgit v1.2.1