diff options
author | Andrii Maletskyi <andrii.maletskyi@gmail.com> | 2020-05-13 20:18:17 +0300 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-05-15 07:11:01 +0600 |
commit | 5101db177c1b2cee0febcc64ad0eb49b39c775e8 (patch) | |
tree | 50200b78694d7895f6e6e7944d1f4befd7943884 | |
parent | e316313222c03af4a85d8d50a01718f86d7a58d4 (diff) | |
download | kombu-5101db177c1b2cee0febcc64ad0eb49b39c775e8.tar.gz |
fix 100% cpu usage on linux while using sqs
fix https://github.com/celery/celery/issues/5299
-rw-r--r-- | kombu/asynchronous/http/curl.py | 57 | ||||
-rw-r--r-- | t/unit/asynchronous/http/test_curl.py | 30 |
2 files changed, 50 insertions, 37 deletions
diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py index 10b17959..cdfc84bb 100644 --- a/kombu/asynchronous/http/curl.py +++ b/kombu/asynchronous/http/curl.py @@ -75,36 +75,47 @@ class CurlClient(BaseClient): self._set_timeout(0) return request + # the next two methods are used for linux/epoll workaround: + # we temporarily remove all curl fds from hub, so curl cannot + # close a fd which is still inside epoll + def _pop_from_hub(self): + for fd in self._fds: + self.hub.remove(fd) + + def _push_to_hub(self): + for fd, events in self._fds.items(): + if events & READ: + self.hub.add_reader(fd, self.on_readable, fd) + if events & WRITE: + self.hub.add_writer(fd, self.on_writable, fd) + def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl): if event == _pycurl.POLL_REMOVE: if fd in self._fds: - self.hub.remove(fd) self._fds.pop(fd, None) else: - if fd in self._fds: - self.hub.remove(fd) if event == _pycurl.POLL_IN: - self.hub.add_reader(fd, self.on_readable, fd) self._fds[fd] = READ elif event == _pycurl.POLL_OUT: - self.hub.add_writer(fd, self.on_writable, fd) self._fds[fd] = WRITE elif event == _pycurl.POLL_INOUT: - self.hub.add_reader(fd, self.on_readable, fd) - self.hub.add_writer(fd, self.on_writable, fd) self._fds[fd] = READ | WRITE def _set_timeout(self, msecs): pass # TODO def _timeout_check(self, _pycurl=pycurl): - while 1: - try: - ret, _ = self._multi.socket_all() - except pycurl.error as exc: - ret = exc.args[0] - if ret != _pycurl.E_CALL_MULTI_PERFORM: - break + self._pop_from_hub() + try: + while 1: + try: + ret, _ = self._multi.socket_all() + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + finally: + self._push_to_hub() self._process_pending_requests() def on_readable(self, fd, _pycurl=pycurl): @@ -114,13 +125,17 @@ class CurlClient(BaseClient): return self._on_event(fd, _pycurl.CSELECT_OUT) def _on_event(self, fd, event, _pycurl=pycurl): - while 1: - try: - ret, _ = self._socket_action(fd, event) - except pycurl.error as exc: - ret = exc.args[0] - if ret != _pycurl.E_CALL_MULTI_PERFORM: - break + self._pop_from_hub() + try: + while 1: + try: + ret, _ = self._socket_action(fd, event) + except pycurl.error as exc: + ret = exc.args[0] + if ret != _pycurl.E_CALL_MULTI_PERFORM: + break + finally: + self._push_to_hub() self._process_pending_requests() def _process_pending_requests(self): diff --git a/t/unit/asynchronous/http/test_curl.py b/t/unit/asynchronous/http/test_curl.py index 553b0ea7..d38a819f 100644 --- a/t/unit/asynchronous/http/test_curl.py +++ b/t/unit/asynchronous/http/test_curl.py @@ -64,48 +64,35 @@ class test_CurlClient: def test_handle_socket(self): with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - hub = Mock(name='hub') - x = self.Client(hub) + x = self.Client() fd = Mock(name='fd1') # POLL_REMOVE x._fds[fd] = fd x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl) - hub.remove.assert_called_with(fd) assert fd not in x._fds x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl) # POLL_IN - hub = x.hub = Mock(name='hub') fds = [fd, Mock(name='fd2'), Mock(name='fd3')] x._fds = {f: f for f in fds} x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl) - hub.remove.assert_has_calls([call(fd)]) - hub.add_reader.assert_called_with(fd, x.on_readable, fd) assert x._fds[fd] == READ # POLL_OUT - hub = x.hub = Mock(name='hub') x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl) - hub.add_writer.assert_called_with(fd, x.on_writable, fd) assert x._fds[fd] == WRITE # POLL_INOUT - hub = x.hub = Mock(name='hub') x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl) - hub.add_reader.assert_called_with(fd, x.on_readable, fd) - hub.add_writer.assert_called_with(fd, x.on_writable, fd) assert x._fds[fd] == READ | WRITE # UNKNOWN EVENT - hub = x.hub = Mock(name='hub') x._handle_socket(0xff3f, fd, x._multi, None, _pycurl) # FD NOT IN FDS - hub = x.hub = Mock(name='hub') x._fds.clear() x._handle_socket(0xff3f, fd, x._multi, None, _pycurl) - hub.remove.assert_not_called() def test_set_timeout(self): x = self.Client() @@ -113,11 +100,22 @@ class test_CurlClient: def test_timeout_check(self): with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl: - x = self.Client() + hub = Mock(name='hub') + x = self.Client(hub) + fd1, fd2 = Mock(name='fd1'), Mock(name='fd2') + x._fds = {fd1: READ} x._process_pending_requests = Mock(name='process_pending') - x._multi.socket_all.return_value = 333, 1 + + def _side_effect(): + x._fds = {fd2: WRITE} + return 333, 1 + + x._multi.socket_all.side_effect = _side_effect _pycurl.error = KeyError + x._timeout_check(_pycurl=_pycurl) + hub.remove.assert_called_with(fd1) + hub.add_writer.assert_called_with(fd2, x.on_writable, fd2) x._multi.socket_all.return_value = None x._multi.socket_all.side_effect = _pycurl.error(333) |