diff options
author | Yiyang Zhou <yiyangzhou123@ucla.edu> | 2021-08-04 21:55:04 +0800 |
---|---|---|
committer | Jens Geyer <Jens-G@users.noreply.github.com> | 2022-04-23 10:30:46 +0200 |
commit | 88a45ac77518eafb57db08938ecdf38c5fcf7a31 (patch) | |
tree | 8ef05402b7c300a3c4b1c3a2240489cdcc010d9b /lib | |
parent | b664cfe2533e4bbf00fd5e7e0211bf7161ee2a04 (diff) | |
download | thrift-88a45ac77518eafb57db08938ecdf38c5fcf7a31.tar.gz |
THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available
Client: Python
Diffstat (limited to 'lib')
-rw-r--r-- | lib/py/src/server/TNonblockingServer.py | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py index ac0649651..fdf6779ad 100644 --- a/lib/py/src/server/TNonblockingServer.py +++ b/lib/py/src/server/TNonblockingServer.py @@ -253,6 +253,7 @@ class TNonblockingServer(object): self._read, self._write = socket.socketpair() self.prepared = False self._stop = False + self.poll = select.poll() if hasattr(select, 'poll') else None def setNumThreads(self, num): """Set the number of worker threads that should be created.""" @@ -318,13 +319,53 @@ class TNonblockingServer(object): else: return select.select(readable, writable, readable) + (True,) + def _poll_select(self): + """Does poll on open connections, if available.""" + remaining = [] + + self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM) + self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM) + + for i, connection in list(self.clients.items()): + if connection.is_readable(): + self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL) + if connection.remaining or connection.received: + remaining.append(connection.fileno()) + if connection.is_writeable(): + self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM) + if connection.is_closed(): + try: + self.poll.unregister(i) + except KeyError: + logger.debug("KeyError in unregistering connections...") + del self.clients[i] + if remaining: + return remaining, [], [], False + + rlist = [] + wlist = [] + xlist = [] + pollres = self.poll.poll() + for fd, event in pollres: + if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): + xlist.append(fd) + elif event & (select.POLLOUT | select.POLLWRNORM): + wlist.append(fd) + elif event & (select.POLLIN | select.POLLRDNORM): + rlist.append(fd) + else: # should be impossible + logger.debug("reached an impossible state in _poll_select") + xlist.append(fd) + + return rlist, wlist, xlist, True + def handle(self): """Handle requests. WARNING! You must call prepare() BEFORE calling handle() """ assert self.prepared, "You have to call prepare before handle" - rset, wset, xset, selected = self._select() + rset, wset, xset, selected = self._select() if not self.poll else self._poll_select() for readable in rset: if readable == self._read.fileno(): # don't care i just need to clean readable flag @@ -343,6 +384,8 @@ class TNonblockingServer(object): connection.read() if connection.received: connection.status = WAIT_PROCESS + if self.poll: + self.poll.unregister(connection.fileno()) msg = connection.received.popleft() itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset) otransport = TTransport.TMemoryBuffer() @@ -354,7 +397,6 @@ class TNonblockingServer(object): self.clients[writeable].write() for oob in xset: self.clients[oob].close() - del self.clients[oob] def close(self): """Closes the server.""" |