summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrii Maletskyi <andrii.maletskyi@gmail.com>2020-05-13 20:18:17 +0300
committerAsif Saif Uddin <auvipy@gmail.com>2020-05-15 07:11:01 +0600
commit5101db177c1b2cee0febcc64ad0eb49b39c775e8 (patch)
tree50200b78694d7895f6e6e7944d1f4befd7943884
parente316313222c03af4a85d8d50a01718f86d7a58d4 (diff)
downloadkombu-5101db177c1b2cee0febcc64ad0eb49b39c775e8.tar.gz
fix 100% cpu usage on linux while using sqs
fix https://github.com/celery/celery/issues/5299
-rw-r--r--kombu/asynchronous/http/curl.py57
-rw-r--r--t/unit/asynchronous/http/test_curl.py30
2 files changed, 50 insertions, 37 deletions
diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py
index 10b17959..cdfc84bb 100644
--- a/kombu/asynchronous/http/curl.py
+++ b/kombu/asynchronous/http/curl.py
@@ -75,36 +75,47 @@ class CurlClient(BaseClient):
self._set_timeout(0)
return request
+ # the next two methods are used for linux/epoll workaround:
+ # we temporarily remove all curl fds from hub, so curl cannot
+ # close a fd which is still inside epoll
+ def _pop_from_hub(self):
+ for fd in self._fds:
+ self.hub.remove(fd)
+
+ def _push_to_hub(self):
+ for fd, events in self._fds.items():
+ if events & READ:
+ self.hub.add_reader(fd, self.on_readable, fd)
+ if events & WRITE:
+ self.hub.add_writer(fd, self.on_writable, fd)
+
def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl):
if event == _pycurl.POLL_REMOVE:
if fd in self._fds:
- self.hub.remove(fd)
self._fds.pop(fd, None)
else:
- if fd in self._fds:
- self.hub.remove(fd)
if event == _pycurl.POLL_IN:
- self.hub.add_reader(fd, self.on_readable, fd)
self._fds[fd] = READ
elif event == _pycurl.POLL_OUT:
- self.hub.add_writer(fd, self.on_writable, fd)
self._fds[fd] = WRITE
elif event == _pycurl.POLL_INOUT:
- self.hub.add_reader(fd, self.on_readable, fd)
- self.hub.add_writer(fd, self.on_writable, fd)
self._fds[fd] = READ | WRITE
def _set_timeout(self, msecs):
pass # TODO
def _timeout_check(self, _pycurl=pycurl):
- while 1:
- try:
- ret, _ = self._multi.socket_all()
- except pycurl.error as exc:
- ret = exc.args[0]
- if ret != _pycurl.E_CALL_MULTI_PERFORM:
- break
+ self._pop_from_hub()
+ try:
+ while 1:
+ try:
+ ret, _ = self._multi.socket_all()
+ except pycurl.error as exc:
+ ret = exc.args[0]
+ if ret != _pycurl.E_CALL_MULTI_PERFORM:
+ break
+ finally:
+ self._push_to_hub()
self._process_pending_requests()
def on_readable(self, fd, _pycurl=pycurl):
@@ -114,13 +125,17 @@ class CurlClient(BaseClient):
return self._on_event(fd, _pycurl.CSELECT_OUT)
def _on_event(self, fd, event, _pycurl=pycurl):
- while 1:
- try:
- ret, _ = self._socket_action(fd, event)
- except pycurl.error as exc:
- ret = exc.args[0]
- if ret != _pycurl.E_CALL_MULTI_PERFORM:
- break
+ self._pop_from_hub()
+ try:
+ while 1:
+ try:
+ ret, _ = self._socket_action(fd, event)
+ except pycurl.error as exc:
+ ret = exc.args[0]
+ if ret != _pycurl.E_CALL_MULTI_PERFORM:
+ break
+ finally:
+ self._push_to_hub()
self._process_pending_requests()
def _process_pending_requests(self):
diff --git a/t/unit/asynchronous/http/test_curl.py b/t/unit/asynchronous/http/test_curl.py
index 553b0ea7..d38a819f 100644
--- a/t/unit/asynchronous/http/test_curl.py
+++ b/t/unit/asynchronous/http/test_curl.py
@@ -64,48 +64,35 @@ class test_CurlClient:
def test_handle_socket(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
- hub = Mock(name='hub')
- x = self.Client(hub)
+ x = self.Client()
fd = Mock(name='fd1')
# POLL_REMOVE
x._fds[fd] = fd
x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
- hub.remove.assert_called_with(fd)
assert fd not in x._fds
x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
# POLL_IN
- hub = x.hub = Mock(name='hub')
fds = [fd, Mock(name='fd2'), Mock(name='fd3')]
x._fds = {f: f for f in fds}
x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl)
- hub.remove.assert_has_calls([call(fd)])
- hub.add_reader.assert_called_with(fd, x.on_readable, fd)
assert x._fds[fd] == READ
# POLL_OUT
- hub = x.hub = Mock(name='hub')
x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl)
- hub.add_writer.assert_called_with(fd, x.on_writable, fd)
assert x._fds[fd] == WRITE
# POLL_INOUT
- hub = x.hub = Mock(name='hub')
x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl)
- hub.add_reader.assert_called_with(fd, x.on_readable, fd)
- hub.add_writer.assert_called_with(fd, x.on_writable, fd)
assert x._fds[fd] == READ | WRITE
# UNKNOWN EVENT
- hub = x.hub = Mock(name='hub')
x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)
# FD NOT IN FDS
- hub = x.hub = Mock(name='hub')
x._fds.clear()
x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)
- hub.remove.assert_not_called()
def test_set_timeout(self):
x = self.Client()
@@ -113,11 +100,22 @@ class test_CurlClient:
def test_timeout_check(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
- x = self.Client()
+ hub = Mock(name='hub')
+ x = self.Client(hub)
+ fd1, fd2 = Mock(name='fd1'), Mock(name='fd2')
+ x._fds = {fd1: READ}
x._process_pending_requests = Mock(name='process_pending')
- x._multi.socket_all.return_value = 333, 1
+
+ def _side_effect():
+ x._fds = {fd2: WRITE}
+ return 333, 1
+
+ x._multi.socket_all.side_effect = _side_effect
_pycurl.error = KeyError
+
x._timeout_check(_pycurl=_pycurl)
+ hub.remove.assert_called_with(fd1)
+ hub.add_writer.assert_called_with(fd2, x.on_writable, fd2)
x._multi.socket_all.return_value = None
x._multi.socket_all.side_effect = _pycurl.error(333)