From 71d805b6086ae19ed774589c25702d791ee91cf2 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 17 Apr 2008 00:19:14 +0000 Subject: Refactored IO Thread creation so that it happens in the Broker class - There is now a single Poller created by the Broker class that is passed to the Acceptor for use in network IO. It can also now be passed to anything else that wants to put work in the IO threads - The Broker class itself is now responsible for actually creating the threads git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648904 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/Acceptor.h | 14 ++++---- cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 60 ++++++++++------------------------- 2 files changed, 25 insertions(+), 49 deletions(-) (limited to 'cpp/src/qpid/sys') diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 1e7827e60c..243e791eeb 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -26,22 +26,24 @@ #include "qpid/SharedObject.h" #include "ConnectionCodec.h" + namespace qpid { namespace sys { +class Poller; + class Acceptor : public qpid::SharedObject { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads); + static Acceptor::shared_ptr create(int16_t port, int backlog); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(ConnectionCodec::Factory*) = 0; + virtual void run(boost::shared_ptr, ConnectionCodec::Factory*) = 0; virtual void connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; - - /** Note: this function is async-signal safe */ - virtual void shutdown() = 0; + boost::shared_ptr, + const std::string& host, int16_t port, + ConnectionCodec::Factory* codec) = 0; }; inline Acceptor::~Acceptor() {} diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 43fbfdf7be..5133fde183 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -43,19 +43,15 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; Socket listener; - int numIOThreads; const uint16_t listeningPort; + std::auto_ptr acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog, int threads); - ~AsynchIOAcceptor() {} - void run(ConnectionCodec::Factory*); - void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); + AsynchIOAcceptor(int16_t port, int backlog); + void run(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); - void shutdown(); - uint16_t getPort() const; std::string getHost() const; @@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor { void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) { - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); } -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : + listeningPort(listener.listen(port, backlog)), + acceptor(0) {} // Buffer definition @@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); - - std::vector t(numIOThreads-1); - - // Run n-1 io threads - for (int i=0; istart(poller); } void AsynchIOAcceptor::connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* f) + Poller::shared_ptr poller, + const std::string& host, int16_t port, + ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect( aio->start(poller); } - -void AsynchIOAcceptor::shutdown() { - // NB: this function must be async-signal safe, it must not - // call any function that is not async-signal safe. - poller->shutdown(); -} - - void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); -- cgit v1.2.1