summaryrefslogtreecommitdiff
path: root/contrib/zeromq/TZmqServer.py
blob: 15c1543ac6d55b0dbd071ba11075330bfce7f931 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import zmq
import thrift.server.TServer
import thrift.transport.TTransport


class TZmqServer(thrift.server.TServer.TServer):
    def __init__(self, processor, ctx, endpoint, sock_type):
        thrift.server.TServer.TServer.__init__(self, processor, None)
        self.zmq_type = sock_type
        self.socket = ctx.socket(sock_type)
        self.socket.bind(endpoint)

    def serveOne(self):
        msg = self.socket.recv()
        itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
        otrans = thrift.transport.TTransport.TMemoryBuffer()
        iprot = self.inputProtocolFactory.getProtocol(itrans)
        oprot = self.outputProtocolFactory.getProtocol(otrans)

        try:
            self.processor.process(iprot, oprot)
        except Exception:
            logging.exception("Exception while processing request")
            # Fall through and send back a response, even if empty or incomplete.

        if self.zmq_type == zmq.REP:
            msg = otrans.getvalue()
            self.socket.send(msg)

    def serve(self):
        while True:
            self.serveOne()


class TZmqMultiServer(object):
    def __init__(self):
        self.servers = []

    def serveOne(self, timeout=-1):
        self._serveActive(self._setupPoll(), timeout)

    def serveForever(self):
        poll_info = self._setupPoll()
        while True:
            self._serveActive(poll_info, -1)

    def _setupPoll(self):
        server_map = {}
        poller = zmq.Poller()
        for server in self.servers:
            server_map[server.socket] = server
            poller.register(server.socket, zmq.POLLIN)
        return (server_map, poller)

    def _serveActive(self, poll_info, timeout):
        (server_map, poller) = poll_info
        ready = dict(poller.poll())
        for sock, state in ready.items():
            assert (state & zmq.POLLIN) != 0
            server_map[sock].serveOne()