diff options
author | Andrii Maletskyi <andrii.maletskyi@gmail.com> | 2020-05-13 20:18:17 +0300 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-05-15 07:11:01 +0600 |
commit | 5101db177c1b2cee0febcc64ad0eb49b39c775e8 (patch) | |
tree | 50200b78694d7895f6e6e7944d1f4befd7943884 /kombu | |
parent | e316313222c03af4a85d8d50a01718f86d7a58d4 (diff) | |
download | kombu-5101db177c1b2cee0febcc64ad0eb49b39c775e8.tar.gz |
fix 100% cpu usage on linux while using sqs
fix https://github.com/celery/celery/issues/5299
Diffstat (limited to 'kombu')
-rw-r--r-- | kombu/asynchronous/http/curl.py | 57 |
1 files changed, 36 insertions, 21 deletions
diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py index 10b17959..cdfc84bb 100644 --- a/kombu/asynchronous/http/curl.py +++ b/kombu/asynchronous/http/curl.py @@ -75,36 +75,47 @@ class CurlClient(BaseClient): self._set_timeout(0) return request + # the next two methods are used for linux/epoll workaround: + # we temporarily remove all curl fds from hub, so curl cannot + # close a fd which is still inside epoll + def _pop_from_hub(self): + for fd in self._fds: + self.hub.remove(fd) + + def _push_to_hub(self): + for fd, events in self._fds.items(): + if events & READ: + self.hub.add_reader(fd, self.on_readable, fd) + if events & WRITE: + self.hub.add_writer(fd, self.on_writable, fd) + def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl): if event == _pycurl.POLL_REMOVE: if fd in self._fds: - self.hub.remove(fd) self._fds.pop(fd, None) else: - if fd in self._fds: - self.hub.remove(fd) if event == _pycurl.POLL_IN: - self.hub.add_reader(fd, self.on_readable, fd) self._fds[fd] = READ elif event == _pycurl.POLL_OUT: - self.hub.add_writer(fd, self.on_writable, fd) self._fds[fd] = WRITE elif event == _pycurl.POLL_INOUT: - self.hub.add_reader(fd, self.on_readable, fd) - self.hub.add_writer(fd, self.on_writable, fd) self._fds[fd] = READ | WRITE def _set_timeout(self, msecs): pass # TODO def _timeout_check(self, _pycurl=pycurl): - while 1: - try: - ret, _ = self._multi.socket_all() - except pycurl.error as exc: - ret = exc.args[0] - if ret != _pycurl.E_CALL_MULTI_PERFORM: - break + self._pop_from_hub() + try: + while 1: + try: + ret, _ = self._multi.socket_all() + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + finally: + self._push_to_hub() self._process_pending_requests() def on_readable(self, fd, _pycurl=pycurl): @@ -114,13 +125,17 @@ class CurlClient(BaseClient): return self._on_event(fd, _pycurl.CSELECT_OUT) def _on_event(self, fd, event, _pycurl=pycurl): - while 1: - try: - ret, _ = self._socket_action(fd, event) - except pycurl.error as exc: - ret = exc.args[0] - if ret != _pycurl.E_CALL_MULTI_PERFORM: - break + self._pop_from_hub() + try: + while 1: + try: + ret, _ = self._socket_action(fd, event) + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + finally: + self._push_to_hub() self._process_pending_requests() def _process_pending_requests(self): |