diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-04-18 21:03:49 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-18 21:03:49 +0000 |
commit | df28afb19a10c804cd02696bd94b09fed1719478 (patch) | |
tree | 9aafbc5ee664b561257cebe8edd2c46955b0999d | |
parent | 8633240562fad63a493b319d12b9c663ca081c75 (diff) | |
download | qpid-python-df28afb19a10c804cd02696bd94b09fed1719478.tar.gz |
Refactored Acceptor code to allow multiple acceptors to be present in the broker
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649689 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/Plugin.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/Plugin.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 29 |
6 files changed, 79 insertions, 34 deletions
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp index d38b53a56e..77627c3742 100644 --- a/cpp/src/qpid/Plugin.cpp +++ b/cpp/src/qpid/Plugin.cpp @@ -22,11 +22,20 @@ namespace qpid { -Plugin::Plugins Plugin::plugins; +namespace { +// This is a single threaded singleton implementation so +// it is important to be sure that the first use of this +// singleton is when the program is still single threaded +Plugin::Plugins& thePlugins() { + static Plugin::Plugins plugins; + + return plugins; +} +} Plugin::Plugin() { // Register myself. - plugins.push_back(this); + thePlugins().push_back(this); } Plugin::~Plugin() {} @@ -34,7 +43,7 @@ Plugin::~Plugin() {} Options* Plugin::getOptions() { return 0; } const Plugin::Plugins& Plugin::getPlugins() { - return plugins; + return thePlugins(); } } // namespace qpid diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index e040662866..7b01c83273 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -88,9 +88,6 @@ class Plugin : boost::noncopyable * Caller must not delete plugin pointers. */ static const Plugins& getPlugins(); - - private: - static Plugins plugins; }; } // namespace qpid diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 9773b49b93..c32a8f889e 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -260,8 +260,7 @@ void Broker::setStore (MessageStore* _store) } void Broker::run() { - - getAcceptor().run(poller, &factory); + accept(); Dispatcher d(poller); int numIOThreads = config.workerThreads; @@ -298,18 +297,6 @@ Broker::~Broker() { } } -uint16_t Broker::getPort() const { return getAcceptor().getPort(); } - -Acceptor& Broker::getAcceptor() const { - if (!acceptor) { - const_cast<Acceptor::shared_ptr&>(acceptor) = - Acceptor::create(config.port, - config.connectionBacklog); - QPID_LOG(info, "Listening on port " << getPort()); - } - return *acceptor; -} - ManagementObject::shared_ptr Broker::GetManagementObject(void) const { return dynamic_pointer_cast<ManagementObject> (mgmtObject); @@ -348,11 +335,41 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } +boost::shared_ptr<Acceptor> Broker::getAcceptor() const { + assert(acceptors.size() > 0); +#if 0 + if (!acceptor) { + const_cast<Acceptor::shared_ptr&>(acceptor) = + Acceptor::create(config.port, + config.connectionBacklog); + QPID_LOG(info, "Listening on port " << getPort()); + } +#endif + return acceptors[0]; +} + +void Broker::registerAccepter(Acceptor::shared_ptr acceptor) { + acceptors.push_back(acceptor); +} + +// TODO: This can only work if there is only one acceptor +uint16_t Broker::getPort() const { + return getAcceptor()->getPort(); +} + +// TODO: This should iterate over all acceptors +void Broker::accept() { + for (unsigned int i = 0; i < acceptors.size(); ++i) + acceptors[i]->run(poller, &factory); +} + + +// TODO: How to chose the acceptor to use for the connection void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor().connect(poller, host, port, f ? f : &factory); + getAcceptor()->connect(poller, host, port, f ? f : &factory); } void Broker::connect( diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 49c4b203c4..02f34ff3ba 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -43,7 +43,6 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/Acceptor.h" #include "qpid/sys/Runnable.h" #include <vector> @@ -51,6 +50,7 @@ namespace qpid { namespace sys { + class Acceptor; class Poller; } @@ -124,6 +124,12 @@ class Broker : public sys::Runnable, public Plugin::Target, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); + /** Add to the broker's acceptors */ + void registerAccepter(boost::shared_ptr<sys::Acceptor>); + + /** Accept connections */ + void accept(); + /** Create a connection to another broker. */ void connect(const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* =0); @@ -131,11 +137,9 @@ class Broker : public sys::Runnable, public Plugin::Target, void connect(const Url& url, sys::ConnectionCodec::Factory* =0); private: - sys::Acceptor& getAcceptor() const; - - boost::shared_ptr<qpid::sys::Poller> poller; + boost::shared_ptr<sys::Poller> poller; Options config; - sys::Acceptor::shared_ptr acceptor; + std::vector< boost::shared_ptr<sys::Acceptor> > acceptors; MessageStore* store; DataDir dataDir; @@ -150,6 +154,10 @@ class Broker : public sys::Runnable, public Plugin::Target, Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; + // TODO: There is no longer a single acceptor so the use of the following needs to be fixed + // For the present just return the first acceptor registered. + boost::shared_ptr<sys::Acceptor> getAcceptor() const; + void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 243e791eeb..69a6eb8d7c 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -35,7 +35,6 @@ class Poller; class Acceptor : public qpid::SharedObject<Acceptor> { public: - static Acceptor::shared_ptr create(int16_t port, int backlog); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 779319b7e4..eb6bcb3dee 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -20,10 +20,13 @@ */ #include "Acceptor.h" - #include "AsynchIOHandler.h" #include "AsynchIO.h" +#include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" + #include <boost/bind.hpp> #include <memory> @@ -47,14 +50,26 @@ class AsynchIOAcceptor : public Acceptor { void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) -{ - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); -} +// Static instance to initialise plugin +static class TCPIOPlugin : public Plugin { + void earlyInitialize(Target&) { + } + + void initialize(Target& target) { + + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + // Only provide to a Broker + if (broker) { + const broker::Broker::Options& opts = broker->getOptions(); + Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on TCP port " << acceptor->getPort()); + broker->registerAccepter(acceptor); + } + } +} acceptor; AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : - listeningPort(listener.listen(port, backlog)), - acceptor(0) + listeningPort(listener.listen(port, backlog)) {} void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { |