#include #include #include #include #include #include "Aggr.h" using std::tr1::bind; using std::tr1::placeholders::_1; using apache::thrift::TException; using apache::thrift::protocol::TBinaryProtocolFactory; using apache::thrift::protocol::TProtocolFactory; using apache::thrift::async::TEvhttpServer; using apache::thrift::async::TAsyncProcessor; using apache::thrift::async::TAsyncBufferProcessor; using apache::thrift::async::TAsyncProtocolProcessor; using apache::thrift::async::TAsyncChannel; using apache::thrift::async::TEvhttpClientChannel; class AggrAsyncHandler : public AggrCobSvIf { protected: struct RequestContext { std::tr1::function const& _return)> cob; std::vector ret; int pending_calls; }; public: AggrAsyncHandler() : eb_(nullptr) , pfact_(new TBinaryProtocolFactory()) { leaf_ports_.push_back(8081); leaf_ports_.push_back(8082); } void addValue(std::tr1::function cob, const int32_t value) { // Silently drop writes to the aggrgator. return cob(); } void getValues(std::tr1::function const& _return)> cob, std::tr1::function exn_cob) { RequestContext* ctx = new RequestContext(); ctx->cob = cob; ctx->pending_calls = leaf_ports_.size(); for (std::vector::iterator it = leaf_ports_.begin(); it != leaf_ports_.end(); ++it) { boost::shared_ptr channel( new TEvhttpClientChannel( "localhost", "/", "127.0.0.1", *it, eb_)); AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); } } void setEventBase(struct event_base* eb) { eb_ = eb; } void clientReturn(RequestContext* ctx, AggrCobClient* client) { ctx->pending_calls -= 1; try { std::vector subret; client->recv_getValues(subret); ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); } catch (TException& exn) { // TODO: Log error } delete client; if (ctx->pending_calls == 0) { ctx->cob(ctx->ret); delete ctx; } } protected: struct event_base* eb_; std::vector leaf_ports_; boost::shared_ptr pfact_; }; int main() { boost::shared_ptr handler(new AggrAsyncHandler()); boost::shared_ptr proc(new AggrAsyncProcessor(handler)); boost::shared_ptr pfact(new TBinaryProtocolFactory()); boost::shared_ptr bufproc(new TAsyncProtocolProcessor(proc, pfact)); boost::shared_ptr server(new TEvhttpServer(bufproc, 8080)); handler->setEventBase(server->getEventBase()); server->serve(); }