From 52ffbbfef04ee479a341ff640cb2df9c41897963 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 22 Apr 2008 21:13:20 +0000 Subject: * Renamed the Acceptor class to be the ProtocolFactory class which better approximates its current behaviour * Slightly refactored TCPIOPlugin to better approximate how it would look when we implement a proper AsynchConnector git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650657 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/Acceptor.h | 54 -------------------------------------- cpp/src/qpid/sys/ProtocolFactory.h | 54 ++++++++++++++++++++++++++++++++++++++ cpp/src/qpid/sys/TCPIOPlugin.cpp | 50 +++++++++++++++-------------------- 3 files changed, 75 insertions(+), 83 deletions(-) delete mode 100644 cpp/src/qpid/sys/Acceptor.h create mode 100644 cpp/src/qpid/sys/ProtocolFactory.h (limited to 'cpp/src/qpid/sys') diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h deleted file mode 100644 index 69a6eb8d7c..0000000000 --- a/cpp/src/qpid/sys/Acceptor.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef _sys_Acceptor_h -#define _sys_Acceptor_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include "qpid/SharedObject.h" -#include "ConnectionCodec.h" - - -namespace qpid { -namespace sys { - -class Poller; - -class Acceptor : public qpid::SharedObject -{ - public: - virtual ~Acceptor() = 0; - virtual uint16_t getPort() const = 0; - virtual std::string getHost() const = 0; - virtual void run(boost::shared_ptr, ConnectionCodec::Factory*) = 0; - virtual void connect( - boost::shared_ptr, - const std::string& host, int16_t port, - ConnectionCodec::Factory* codec) = 0; -}; - -inline Acceptor::~Acceptor() {} - -}} - - - -#endif /*!_sys_Acceptor_h*/ diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h new file mode 100644 index 0000000000..5f80771e49 --- /dev/null +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -0,0 +1,54 @@ +#ifndef _sys_ProtocolFactory_h +#define _sys_ProtocolFactory_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include "qpid/SharedObject.h" +#include "ConnectionCodec.h" + + +namespace qpid { +namespace sys { + +class Poller; + +class ProtocolFactory : public qpid::SharedObject +{ + public: + virtual ~ProtocolFactory() = 0; + virtual uint16_t getPort() const = 0; + virtual std::string getHost() const = 0; + virtual void accept(boost::shared_ptr, ConnectionCodec::Factory*) = 0; + virtual void connect( + boost::shared_ptr, + const std::string& host, int16_t port, + ConnectionCodec::Factory* codec) = 0; +}; + +inline ProtocolFactory::~ProtocolFactory() {} + +}} + + + +#endif //!_sys_ProtocolFactory_h diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index eb6bcb3dee..65ea380b07 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -19,7 +19,7 @@ * */ -#include "Acceptor.h" +#include "ProtocolFactory.h" #include "AsynchIOHandler.h" #include "AsynchIO.h" @@ -33,21 +33,21 @@ namespace qpid { namespace sys { -class AsynchIOAcceptor : public Acceptor { +class AsynchIOProtocolFactory : public ProtocolFactory { Socket listener; const uint16_t listeningPort; std::auto_ptr acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog); - void run(Poller::shared_ptr, ConnectionCodec::Factory*); + AsynchIOProtocolFactory(int16_t port, int backlog); + void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); uint16_t getPort() const; std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); }; // Static instance to initialise plugin @@ -56,24 +56,26 @@ static class TCPIOPlugin : public Plugin { } void initialize(Target& target) { - broker::Broker* broker = dynamic_cast(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog)); - QPID_LOG(info, "Listening on TCP port " << acceptor->getPort()); - broker->registerAccepter(acceptor); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory(protocol); } } -} acceptor; +} tcpPlugin; -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : listeningPort(listener.listen(port, backlog)) {} -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { +void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + if (isClient) + async->setClient(); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -85,40 +87,30 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn aio->start(poller); } - -uint16_t AsynchIOAcceptor::getPort() const { +uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string AsynchIOAcceptor::getHost() const { +std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); acceptor->start(poller); } -void AsynchIOAcceptor::connect( +void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); - async->setClient(); - AsynchIO* aio = new AsynchIO(*socket, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); - aio->start(poller); + + established(poller, *socket, f, true); } }} // namespace qpid::sys -- cgit v1.2.1