diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 04:45:26 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 04:45:26 +0000 |
commit | 468b4b6ddaa3d96bb743cdbd27ded651eea31847 (patch) | |
tree | f1987d223e56792386f884029664dae277bff754 | |
parent | 51ea054146d0af71640bd44e8e75494847216a31 (diff) | |
download | qpid-python-468b4b6ddaa3d96bb743cdbd27ded651eea31847.tar.gz |
Refactored c++ client library to allow multiple protocols to be
used simultaneously:
- Added in capability for client library plugins:
Client library will load in plugin modules from
the client library module directory on library load.
- Add protocol option into the standard client command line options
- Split plugin module load area into daemon and client;
default daemon module directory is now <libdir>/qpid/daemon,
default client module directory is <libdir>/qpid/client.
- Changed names of plugins to leave out libqpid prefix
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694113 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/etc/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/etc/qpidc.conf | 2 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 39 | ||||
-rw-r--r-- | cpp/src/acl.mk | 9 | ||||
-rw-r--r-- | cpp/src/cluster.mk | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionSettings.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionSettings.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 206 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 114 | ||||
-rw-r--r-- | cpp/src/qpid/client/LoadPlugins.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpidd.cpp | 13 | ||||
-rw-r--r-- | cpp/src/tests/ConnectionOptions.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 2 |
16 files changed, 347 insertions, 174 deletions
diff --git a/cpp/etc/Makefile.am b/cpp/etc/Makefile.am index f683459cd4..d3dd67d35e 100644 --- a/cpp/etc/Makefile.am +++ b/cpp/etc/Makefile.am @@ -4,6 +4,10 @@ EXTRA_DIST = \ $(SASL_CONF) \ qpidd qpidd.conf +confdir=$(sysconfdir)/qpid +nobase_conf_DATA=\ + qpidc.conf + nobase_sysconf_DATA = \ qpidd.conf diff --git a/cpp/etc/qpidc.conf b/cpp/etc/qpidc.conf new file mode 100644 index 0000000000..065442399e --- /dev/null +++ b/cpp/etc/qpidc.conf @@ -0,0 +1,2 @@ +# Configuration file for the qpid client library. Entries are of the form: +# name = value diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index f0d1ea04c9..a5e4527c30 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -51,12 +51,13 @@ BUILT_SOURCES=qpid/framing/MaxMethodBodySize.h DISTCLEANFILES=qpid/framing/MaxMethodBodySize.h ## Compiler flags - AM_CXXFLAGS = $(WARNING_CFLAGS) AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) INCLUDES = -Igen -I$(srcdir)/gen ## Automake macros to build libraries and executables. +qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DMODULE_DIR=\"$(dmoduledir)\" -DCONF_FILE=\"$(sysconfdir)/qpidd.conf\" +libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DMODULE_DIR=\"$(cmoduledir)\" -DCONF_FILE=\"$(confdir)/qpidc.conf\" qpidd_LDADD = \ libqpidbroker.la \ @@ -95,7 +96,15 @@ endif platform_src = $(posix_plat_src) $(poller) platform_hdr = $(posix_plat_hdr) -lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la +lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la + +# Definitions for client and daemon plugins +PLUGINLDFLAGS=-no-undefined -module -avoid-version +confdir=$(sysconfdir)/qpid +dmoduledir=$(libdir)/qpid/daemon +cmoduledir=$(libdir)/qpid/client +dmodule_LTLIBRARIES = +cmodule_LTLIBRARIES = include cluster.mk include acl.mk @@ -113,33 +122,35 @@ libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter if RDMA # RDMA (Infiniband) protocol code -libqpidrdma_la_SOURCES = \ +librdmawrap_la_SOURCES = \ qpid/sys/rdma/rdma_exception.h \ qpid/sys/rdma/rdma_factories.cpp \ qpid/sys/rdma/RdmaIO.cpp \ qpid/sys/rdma/RdmaIO.h \ qpid/sys/rdma/rdma_wrap.h -libqpidrdma_la_LIBADD = \ +librdmawrap_la_LIBADD = \ + libqpidcommon.la \ -lrdmacm \ -libverbs -libqpidrdma_la_CXXFLAGS = \ +librdmawrap_la_CXXFLAGS = \ $(AM_CXXFLAGS) -Wno-missing-field-initializers -noinst_LTLIBRARIES += \ - libqpidrdma.la -qpidd_LDADD += \ - libqpidrdma.la +lib_LTLIBRARIES += \ + librdmawrap.la +librdmawrap_la_LDFLAGS = \ + -no-undefined +# RDMA test/sample programs noinst_PROGRAMS += RdmaServer RdmaClient RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp RdmaServer_CXXFLAGS = \ $(AM_CXXFLAGS) -Wno-missing-field-initializers RdmaServer_LDADD = \ - libqpidrdma.la libqpidcommon.la + librdmawrap.la libqpidcommon.la RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp RdmaClient_CXXFLAGS = \ $(AM_CXXFLAGS) -Wno-missing-field-initializers RdmaClient_LDADD = \ - libqpidrdma.la libqpidcommon.la + librdmawrap.la libqpidcommon.la endif @@ -329,16 +340,17 @@ libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ qpid/client/AckPolicy.cpp \ qpid/client/Bounds.cpp \ - qpid/client/ConnectionImpl.cpp \ - qpid/client/Connector.cpp \ qpid/client/Connection.cpp \ qpid/client/ConnectionHandler.cpp \ + qpid/client/ConnectionImpl.cpp \ qpid/client/ConnectionSettings.cpp \ + qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ qpid/client/Future.cpp \ qpid/client/FutureCompletion.cpp \ qpid/client/FutureResult.cpp \ + qpid/client/LoadPlugins.cpp \ qpid/client/LocalQueue.cpp \ qpid/client/Message.cpp \ qpid/client/MessageListener.cpp \ @@ -579,7 +591,6 @@ if HAVE_XML nobase_include_HEADERS += qpid/broker/XmlExchange.h endif - # Force build of qpidd during dist phase so help2man will work. dist-hook: $(BUILT_SOURCES) $(MAKE) qpidd diff --git a/cpp/src/acl.mk b/cpp/src/acl.mk index cad66adf1a..d31a883c24 100644 --- a/cpp/src/acl.mk +++ b/cpp/src/acl.mk @@ -1,9 +1,9 @@ # # acl library makefile fragment, to be included in Makefile.am # -lib_LTLIBRARIES += libqpidacl.la +dmodule_LTLIBRARIES += acl.la -libqpidacl_la_SOURCES = \ +acl_la_SOURCES = \ qpid/acl/Acl.cpp \ qpid/acl/Acl.h \ qpid/acl/AclData.cpp \ @@ -12,6 +12,5 @@ libqpidacl_la_SOURCES = \ qpid/acl/AclReader.cpp \ qpid/acl/AclReader.h -libqpidacl_la_LIBADD = libqpidbroker.la - - +acl_la_LIBADD= libqpidbroker.la +acl_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 334cc907e3..bb9546f387 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -1,11 +1,11 @@ # # Cluster library makefile fragment, to be included in Makefile.am # -lib_LTLIBRARIES += libqpidcluster.la +dmodule_LTLIBRARIES += cluster.la if CPG -libqpidcluster_la_SOURCES = \ +cluster_la_SOURCES = \ qpid/cluster/types.h \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ @@ -31,10 +31,12 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/DumpClient.h \ qpid/cluster/DumpClient.cpp -libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la +cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la else # Empty stub library to satisfy rpm spec file. -libqpidcluster_la_SOURCES = +cluster_la_SOURCES = endif + +cluster_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index e19cfea059..b8b4f9ccf3 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -105,7 +105,7 @@ void Connection::open(const ConnectionSettings& settings) throw Exception(QPID_MSG("Connection::open() was already called")); impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); - impl->open(settings.host, settings.port); + impl->open(); max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index f623530f98..f180c4f23e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -40,7 +40,6 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - connector(new Connector(v, settings, this)), version(v) { QPID_LOG(debug, "ConnectionImpl created for " << version); @@ -48,8 +47,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, CLOSE_CODE_NORMAL, std::string()); - connector->setInputHandler(&handler); - connector->setShutdownHandler(this); //only set error handler once open handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); @@ -59,7 +56,7 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - connector->close(); + if (connector) connector->close(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) @@ -93,9 +90,16 @@ bool ConnectionImpl::isOpen() const } -void ConnectionImpl::open(const std::string& host, int port) +void ConnectionImpl::open() { - QPID_LOG(info, "Connecting to " << host << ":" << port); + const std::string& protocol = handler.protocol; + const std::string& host = handler.host; + int port = handler.port; + QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); + + connector.reset(Connector::create(protocol, version, handler, this)); + connector->setInputHandler(&handler); + connector->setShutdownHandler(this); connector->connect(host, port); connector->init(); handler.waitForOpen(); diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index cd0788d68b..aca26b963d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -69,7 +69,7 @@ class ConnectionImpl : public Bounds, ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); ~ConnectionImpl(); - void open(const std::string& host, int port); + void open(); bool isOpen() const; void addSession(const boost::shared_ptr<SessionImpl>&); diff --git a/cpp/src/qpid/client/ConnectionSettings.cpp b/cpp/src/qpid/client/ConnectionSettings.cpp index 6bc220cd41..751f867c1e 100644 --- a/cpp/src/qpid/client/ConnectionSettings.cpp +++ b/cpp/src/qpid/client/ConnectionSettings.cpp @@ -28,6 +28,7 @@ namespace qpid { namespace client { ConnectionSettings::ConnectionSettings() : + protocol("tcp"), host("localhost"), port(TcpAddress::DEFAULT_PORT), username("guest"), diff --git a/cpp/src/qpid/client/ConnectionSettings.h b/cpp/src/qpid/client/ConnectionSettings.h index 5e93b3103e..0d7f9eb848 100644 --- a/cpp/src/qpid/client/ConnectionSettings.h +++ b/cpp/src/qpid/client/ConnectionSettings.h @@ -52,6 +52,11 @@ struct ConnectionSettings { virtual void configureSocket(qpid::sys::Socket&) const; /** + * The protocol used for the connection (defaults to 'tcp') + */ + std::string protocol; + + /** * The host (or ip address) to connect to (defaults to 'localhost'). */ std::string host; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index f4f414bc63..6449088f92 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -32,6 +32,7 @@ #include "qpid/Msg.h" #include <iostream> +#include <map> #include <boost/bind.hpp> #include <boost/format.hpp> @@ -43,7 +44,135 @@ using namespace qpid::framing; using boost::format; using boost::str; -Connector::Connector(ProtocolVersion ver, +// Stuff for the registry of protocol connectors (maybe should be moved to its own file) +namespace { + typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; + + ProtocolRegistry& theProtocolRegistry() { + static ProtocolRegistry protocolRegistry; + + return protocolRegistry; + } +} + +Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +{ + ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); + if (i==theProtocolRegistry().end()) { + throw Exception(QPID_MSG("Unknown protocol: " << proto)); + } + return (i->second)(v, s, c); +} + +void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) +{ + ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); + if (i!=theProtocolRegistry().end()) { + QPID_LOG(error, "Tried to register protocol: " << proto << " more than once"); + } + theProtocolRegistry()[proto] = connectorFactory; +} + +class TCPConnector : public Connector, private sys::Runnable +{ + struct Buff; + + /** Batch up frames for writing to aio. */ + class Writer : public framing::FrameHandler { + typedef sys::AsynchIOBufferBase BufferBase; + typedef std::vector<framing::AMQFrame> Frames; + + const uint16_t maxFrameSize; + sys::Mutex lock; + sys::AsynchIO* aio; + BufferBase* buffer; + Frames frames; + size_t lastEof; // Position after last EOF in frames + framing::Buffer encode; + size_t framesEncoded; + std::string identifier; + Bounds* bounds; + + void writeOne(); + void newBuffer(); + + public: + + Writer(uint16_t maxFrameSize, Bounds*); + ~Writer(); + void init(std::string id, sys::AsynchIO*); + void handle(framing::AMQFrame&); + void write(sys::AsynchIO&); + }; + + const uint16_t maxFrameSize; + framing::ProtocolVersion version; + bool initiated; + + sys::Mutex closedLock; + bool closed; + bool joined; + + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; + + Writer writer; + + sys::Thread receiver; + + sys::Socket socket; + + sys::AsynchIO* aio; + boost::shared_ptr<sys::Poller> poller; + + ~TCPConnector(); + + void run(); + void handleClosed(); + bool closeInternal(); + + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void writebuff(qpid::sys::AsynchIO&); + void writeDataBlock(const framing::AMQDataBlock& data); + void eof(qpid::sys::AsynchIO&); + + std::string identifier; + + ConnectionImpl* impl; + + void connect(const std::string& host, int port); + void init(); + void close(); + void send(framing::AMQFrame& frame); + + void setInputHandler(framing::InputHandler* handler); + void setShutdownHandler(sys::ShutdownHandler* handler); + sys::ShutdownHandler* getShutdownHandler() const; + framing::OutputHandler* getOutputHandler(); + const std::string& getIdentifier() const; + +public: + TCPConnector(framing::ProtocolVersion pVersion, + const ConnectionSettings&, + ConnectionImpl*); +}; + +// Static constructor which registers connector here +namespace { + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(v, s, c); + } + + struct StaticInit { + StaticInit() { + Connector::registerFactory("tcp", &create); + }; + } init; +} + +TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -51,23 +180,20 @@ Connector::Connector(ProtocolVersion ver, initiated(false), closed(true), joined(true), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), impl(cimpl) { - QPID_LOG(debug, "Connector created for " << version); + QPID_LOG(debug, "TCPConnector created for " << version); settings.configureSocket(socket); } -Connector::~Connector() { +TCPConnector::~TCPConnector() { close(); } -void Connector::connect(const std::string& host, int port){ +void TCPConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(closedLock); assert(closed); socket.connect(host, port); @@ -75,16 +201,16 @@ void Connector::connect(const std::string& host, int port){ closed = false; poller = Poller::shared_ptr(new Poller); aio = new AsynchIO(socket, - boost::bind(&Connector::readbuff, this, _1, _2), - boost::bind(&Connector::eof, this, _1), - boost::bind(&Connector::eof, this, _1), + boost::bind(&TCPConnector::readbuff, this, _1, _2), + boost::bind(&TCPConnector::eof, this, _1), + boost::bind(&TCPConnector::eof, this, _1), 0, // closed 0, // nobuffs - boost::bind(&Connector::writebuff, this, _1)); + boost::bind(&TCPConnector::writebuff, this, _1)); writer.init(identifier, aio); } -void Connector::init(){ +void TCPConnector::init(){ Mutex::ScopedLock l(closedLock); assert(joined); ProtocolInitiation init(version); @@ -93,7 +219,7 @@ void Connector::init(){ receiver = Thread(this); } -bool Connector::closeInternal() { +bool TCPConnector::closeInternal() { Mutex::ScopedLock l(closedLock); bool ret = !closed; if (!closed) { @@ -108,49 +234,57 @@ bool Connector::closeInternal() { return ret; } -void Connector::close() { +void TCPConnector::close() { closeInternal(); } -void Connector::setInputHandler(InputHandler* handler){ +void TCPConnector::setInputHandler(InputHandler* handler){ input = handler; } -void Connector::setShutdownHandler(ShutdownHandler* handler){ +void TCPConnector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } -OutputHandler* Connector::getOutputHandler(){ +OutputHandler* TCPConnector::getOutputHandler() { return this; } -void Connector::send(AMQFrame& frame) { +sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { + return shutdownHandler; +} + +const std::string& TCPConnector::getIdentifier() const { + return identifier; +} + +void TCPConnector::send(AMQFrame& frame) { writer.handle(frame); } -void Connector::handleClosed() { +void TCPConnector::handleClosed() { if (closeInternal() && shutdownHandler) shutdownHandler->shutdown(); } -struct Connector::Buff : public AsynchIO::BufferBase { +struct TCPConnector::Buff : public AsynchIO::BufferBase { Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} ~Buff() { delete [] bytes;} }; -Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) +TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } -Connector::Writer::~Writer() { delete buffer; } +TCPConnector::Writer::~Writer() { delete buffer; } -void Connector::Writer::init(std::string id, sys::AsynchIO* a) { +void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) { Mutex::ScopedLock l(lock); identifier = id; aio = a; - newBuffer(l); + newBuffer(); } -void Connector::Writer::handle(framing::AMQFrame& frame) { +void TCPConnector::Writer::handle(framing::AMQFrame& frame) { Mutex::ScopedLock l(lock); frames.push_back(frame); if (frame.getEof()) {//or if we already have a buffers worth @@ -160,17 +294,17 @@ void Connector::Writer::handle(framing::AMQFrame& frame) { QPID_LOG(trace, "SENT " << identifier << ": " << frame); } -void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { +void TCPConnector::Writer::writeOne() { assert(buffer); framesEncoded = 0; buffer->dataStart = 0; buffer->dataCount = encode.getPosition(); aio->queueWrite(buffer); - newBuffer(l); + newBuffer(); } -void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { +void TCPConnector::Writer::newBuffer() { buffer = aio->getQueuedBuffer(); if (!buffer) buffer = new Buff(maxFrameSize); encode = framing::Buffer(buffer->bytes, buffer->byteCount); @@ -178,14 +312,14 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { } // Called in IO thread. -void Connector::Writer::write(sys::AsynchIO&) { +void TCPConnector::Writer::write(sys::AsynchIO&) { Mutex::ScopedLock l(lock); assert(buffer); size_t bytesWritten(0); for (size_t i = 0; i < lastEof; ++i) { AMQFrame& frame = frames[i]; uint32_t size = frame.size(); - if (size > encode.available()) writeOne(l); + if (size > encode.available()) writeOne(); assert(size <= encode.available()); frame.encode(encode); ++framesEncoded; @@ -194,10 +328,10 @@ void Connector::Writer::write(sys::AsynchIO&) { frames.erase(frames.begin(), frames.begin()+lastEof); lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(l); + if (encode.getPosition() > 0) writeOne(); } -void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { +void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); if (!initiated) { @@ -226,11 +360,11 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { } } -void Connector::writebuff(AsynchIO& aio_) { +void TCPConnector::writebuff(AsynchIO& aio_) { writer.write(aio_); } -void Connector::writeDataBlock(const AMQDataBlock& data) { +void TCPConnector::writeDataBlock(const AMQDataBlock& data) { AsynchIO::BufferBase* buff = new Buff(maxFrameSize); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); @@ -238,13 +372,13 @@ void Connector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void Connector::eof(AsynchIO&) { +void TCPConnector::eof(AsynchIO&) { handleClosed(); } // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called -void Connector::run(){ +void TCPConnector::run(){ // Keep the connection impl in memory until run() completes. boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); assert(protect); diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index cde12f7b5b..39e29a33be 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -40,109 +40,31 @@ #include <boost/shared_ptr.hpp> namespace qpid { - -namespace sys { -class Poller; -class AsynchIO; -class AsynchIOBufferBase; -} - namespace client { -class Bounds; class ConnectionSettings; class ConnectionImpl; ///@internal -class Connector : public framing::OutputHandler, - private sys::Runnable -{ - struct Buff; - - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::AsynchIOBufferBase BufferBase; - typedef std::vector<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::AsynchIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(const sys::Mutex::ScopedLock&); - void newBuffer(const sys::Mutex::ScopedLock&); - - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::AsynchIO*); - void handle(framing::AMQFrame&); - void write(sys::AsynchIO&); - }; - - const uint16_t maxFrameSize; - framing::ProtocolVersion version; - bool initiated; - - sys::Mutex closedLock; - bool closed; - bool joined; - - sys::AbsTime lastIn; - sys::AbsTime lastOut; - sys::Duration timeout; - sys::Duration idleIn; - sys::Duration idleOut; - - sys::TimeoutHandler* timeoutHandler; - sys::ShutdownHandler* shutdownHandler; - framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; - - Writer writer; - - sys::Thread receiver; - - sys::Socket socket; - - sys::AsynchIO* aio; - boost::shared_ptr<sys::Poller> poller; - - void run(); - void handleClosed(); - bool closeInternal(); - - void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); - void writebuff(qpid::sys::AsynchIO&); - void writeDataBlock(const framing::AMQDataBlock& data); - void eof(qpid::sys::AsynchIO&); - - std::string identifier; - - ConnectionImpl* impl; - +class Connector : public framing::OutputHandler +{ public: - Connector(framing::ProtocolVersion pVersion, - const ConnectionSettings&, - ConnectionImpl*); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(); - virtual void close(); - virtual void setInputHandler(framing::InputHandler* handler); - virtual void setShutdownHandler(sys::ShutdownHandler* handler); - virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; } - virtual framing::OutputHandler* getOutputHandler(); - virtual void send(framing::AMQFrame& frame); - const std::string& getIdentifier() const { return identifier; } + // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future) + typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + static void registerFactory(const std::string& proto, Factory* connectorFactory); + + virtual ~Connector() {}; + virtual void connect(const std::string& host, int port) = 0; + virtual void init() {}; + virtual void close() = 0; + virtual void send(framing::AMQFrame& frame) = 0; + + virtual void setInputHandler(framing::InputHandler* handler) = 0; + virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0; + virtual sys::ShutdownHandler* getShutdownHandler() const = 0; + virtual framing::OutputHandler* getOutputHandler() = 0; + virtual const std::string& getIdentifier() const = 0; }; }} diff --git a/cpp/src/qpid/client/LoadPlugins.cpp b/cpp/src/qpid/client/LoadPlugins.cpp new file mode 100644 index 0000000000..ddfa179c07 --- /dev/null +++ b/cpp/src/qpid/client/LoadPlugins.cpp @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/log/Statement.h" +#include "qpid/log/Options.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Shlib.h" +#include <boost/filesystem/operations.hpp> +#include <boost/filesystem/path.hpp> + +using namespace qpid; +using namespace qpid::sys; +using namespace qpid::log; +using namespace std; +namespace fs=boost::filesystem; + +struct ModuleOptions : public qpid::Options { + string loadDir; + vector<string> load; + bool noLoad; + ModuleOptions() : qpid::Options("Module options"), loadDir(MODULE_DIR), noLoad(false) + { + addOptions() + ("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory") + ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") + ("no-module-dir", optValue(noLoad), "Don't load modules from module directory"); + } +}; + +// TODO: The following is copied from qpidd.cpp - it needs to be common code +void tryShlib(const char* libname, bool noThrow) { + try { + Shlib shlib(libname); + QPID_LOG (info, "Loaded Module: " << libname); + } + catch (const exception& e) { + if (!noThrow) + throw; + } +} + +void loadModuleDir (string dirname, bool isDefault) +{ + fs::path dirPath (dirname, fs::native); + + if (!fs::exists (dirPath)) + { + if (isDefault) + return; + throw Exception ("Directory not found: " + dirname); + } + + fs::directory_iterator endItr; + for (fs::directory_iterator itr (dirPath); itr != endItr; ++itr) + { + if (!fs::is_directory(*itr) && + itr->string().find (".so") == itr->string().length() - 3) + tryShlib (itr->string().data(), true); + } +} + +struct LoadtimeInitialise { + LoadtimeInitialise() { + ModuleOptions moduleOptions; + string defaultPath (moduleOptions.loadDir); + moduleOptions.parse (0, 0, CONF_FILE, true); + + for (vector<string>::iterator iter = moduleOptions.load.begin(); + iter != moduleOptions.load.end(); + iter++) + tryShlib (iter->data(), false); + + if (!moduleOptions.noLoad) { + bool isDefault = defaultPath == moduleOptions.loadDir; + loadModuleDir (moduleOptions.loadDir, isDefault); + } + } +} init; diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp index e79875f964..4acfef332a 100644 --- a/cpp/src/qpidd.cpp +++ b/cpp/src/qpidd.cpp @@ -34,7 +34,6 @@ #include <fstream> #include <signal.h> #include <unistd.h> -#include <sys/utsname.h> using namespace qpid; using namespace qpid::broker; @@ -47,14 +46,8 @@ struct ModuleOptions : public qpid::Options { string loadDir; vector<string> load; bool noLoad; - ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false) + ModuleOptions() : qpid::Options("Module options"), loadDir(MODULE_DIR), noLoad(false) { - struct utsname _uname; - if (::uname(&_uname) == 0) { - if (string(_uname.machine) == "x86_64") - loadDir = "/usr/lib64/qpidd"; - } - addOptions() ("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory") ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") @@ -97,7 +90,7 @@ struct QpiddOptions : public qpid::Options { DaemonOptions daemon; qpid::log::Options log; - QpiddOptions(const char* argv0) : qpid::Options("Options"), common("", "/etc/qpidd.conf"), log(argv0) { + QpiddOptions(const char* argv0) : qpid::Options("Options"), common("", CONF_FILE), log(argv0) { add(common); add(module); add(broker); @@ -121,7 +114,7 @@ struct BootstrapOptions : public qpid::Options { ModuleOptions module; qpid::log::Options log; - BootstrapOptions(const char* argv0) : qpid::Options("Options"), common("", "/etc/qpidd.conf"), log(argv0) { + BootstrapOptions(const char* argv0) : qpid::Options("Options"), common("", CONF_FILE), log(argv0) { add(common); add(module); add(log); diff --git a/cpp/src/tests/ConnectionOptions.h b/cpp/src/tests/ConnectionOptions.h index 1579ac0b57..0130842668 100644 --- a/cpp/src/tests/ConnectionOptions.h +++ b/cpp/src/tests/ConnectionOptions.h @@ -37,6 +37,7 @@ struct ConnectionOptions : public qpid::Options, addOptions() ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") + ("protocol,P", optValue(protocol, "tcp|rdma"), "Protocol to use for broker connection") ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 8b83e927ec..3454beb9bc 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -3,7 +3,7 @@ if CPG # Cluster tests makefile fragment, to be included in Makefile.am # -lib_cluster = $(abs_builddir)/../libqpidcluster.la +lib_cluster = $(abs_builddir)/../cluster.la # NOTE: Programs using the openais library must be run with gid=ais # You should do "newgrp ais" before running the tests to run these. |