summaryrefslogtreecommitdiff
path: root/contrib/async-test/test-server.cpp
blob: 9e696c92cfbb8e2d99259996559d0ba8c0894cb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <tr1/functional>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/async/TEvhttpServer.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include "Aggr.h"

using std::tr1::bind;
using std::tr1::placeholders::_1;

using apache::thrift::TException;
using apache::thrift::protocol::TBinaryProtocolFactory;
using apache::thrift::protocol::TProtocolFactory;
using apache::thrift::async::TEvhttpServer;
using apache::thrift::async::TAsyncProcessor;
using apache::thrift::async::TAsyncBufferProcessor;
using apache::thrift::async::TAsyncProtocolProcessor;
using apache::thrift::async::TAsyncChannel;
using apache::thrift::async::TEvhttpClientChannel;

class AggrAsyncHandler : public AggrCobSvIf {
 protected:
  struct RequestContext {
    std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
    std::vector<int32_t> ret;
    int pending_calls;
  };

 public:
  AggrAsyncHandler()
    : eb_(nullptr)
    , pfact_(new TBinaryProtocolFactory())
  {
    leaf_ports_.push_back(8081);
    leaf_ports_.push_back(8082);
  }

  void addValue(std::tr1::function<void()> cob, const int32_t value) {
    // Silently drop writes to the aggrgator.
    return cob();
  }

  void getValues(std::tr1::function<void(
        std::vector<int32_t> const& _return)> cob,
      std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
    RequestContext* ctx = new RequestContext();
    ctx->cob = cob;
    ctx->pending_calls = leaf_ports_.size();
    for (std::vector<int>::iterator it = leaf_ports_.begin();
        it != leaf_ports_.end(); ++it) {
      boost::shared_ptr<TAsyncChannel> channel(
          new TEvhttpClientChannel(
            "localhost", "/", "127.0.0.1", *it, eb_));
      AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
      client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
    }
  }

  void setEventBase(struct event_base* eb) {
    eb_ = eb;
  }

  void clientReturn(RequestContext* ctx, AggrCobClient* client) {
    ctx->pending_calls -= 1;

    try {
      std::vector<int32_t> subret;
      client->recv_getValues(subret);
      ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
    } catch (TException& exn) {
      // TODO: Log error
    }

    delete client;

    if (ctx->pending_calls == 0) {
      ctx->cob(ctx->ret);
      delete ctx;
    }
  }

 protected:
  struct event_base* eb_;
  std::vector<int> leaf_ports_;
  boost::shared_ptr<TProtocolFactory> pfact_;
};


int main() {
  boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
  boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
  boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
  boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
  boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
  handler->setEventBase(server->getEventBase());
  server->serve();
}