summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorAndrii Maletskyi <andrii.maletskyi@gmail.com>2020-05-13 20:18:17 +0300
committerAsif Saif Uddin <auvipy@gmail.com>2020-05-15 07:11:01 +0600
commit5101db177c1b2cee0febcc64ad0eb49b39c775e8 (patch)
tree50200b78694d7895f6e6e7944d1f4befd7943884 /kombu
parente316313222c03af4a85d8d50a01718f86d7a58d4 (diff)
downloadkombu-5101db177c1b2cee0febcc64ad0eb49b39c775e8.tar.gz
fix 100% cpu usage on linux while using sqs
fix https://github.com/celery/celery/issues/5299
Diffstat (limited to 'kombu')
-rw-r--r--kombu/asynchronous/http/curl.py57
1 files changed, 36 insertions, 21 deletions
diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py
index 10b17959..cdfc84bb 100644
--- a/kombu/asynchronous/http/curl.py
+++ b/kombu/asynchronous/http/curl.py
@@ -75,36 +75,47 @@ class CurlClient(BaseClient):
self._set_timeout(0)
return request
+ # the next two methods are used for linux/epoll workaround:
+ # we temporarily remove all curl fds from hub, so curl cannot
+ # close a fd which is still inside epoll
+ def _pop_from_hub(self):
+ for fd in self._fds:
+ self.hub.remove(fd)
+
+ def _push_to_hub(self):
+ for fd, events in self._fds.items():
+ if events & READ:
+ self.hub.add_reader(fd, self.on_readable, fd)
+ if events & WRITE:
+ self.hub.add_writer(fd, self.on_writable, fd)
+
def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl):
if event == _pycurl.POLL_REMOVE:
if fd in self._fds:
- self.hub.remove(fd)
self._fds.pop(fd, None)
else:
- if fd in self._fds:
- self.hub.remove(fd)
if event == _pycurl.POLL_IN:
- self.hub.add_reader(fd, self.on_readable, fd)
self._fds[fd] = READ
elif event == _pycurl.POLL_OUT:
- self.hub.add_writer(fd, self.on_writable, fd)
self._fds[fd] = WRITE
elif event == _pycurl.POLL_INOUT:
- self.hub.add_reader(fd, self.on_readable, fd)
- self.hub.add_writer(fd, self.on_writable, fd)
self._fds[fd] = READ | WRITE
def _set_timeout(self, msecs):
pass # TODO
def _timeout_check(self, _pycurl=pycurl):
- while 1:
- try:
- ret, _ = self._multi.socket_all()
- except pycurl.error as exc:
- ret = exc.args[0]
- if ret != _pycurl.E_CALL_MULTI_PERFORM:
- break
+ self._pop_from_hub()
+ try:
+ while 1:
+ try:
+ ret, _ = self._multi.socket_all()
+ except pycurl.error as exc:
+ ret = exc.args[0]
+ if ret != _pycurl.E_CALL_MULTI_PERFORM:
+ break
+ finally:
+ self._push_to_hub()
self._process_pending_requests()
def on_readable(self, fd, _pycurl=pycurl):
@@ -114,13 +125,17 @@ class CurlClient(BaseClient):
return self._on_event(fd, _pycurl.CSELECT_OUT)
def _on_event(self, fd, event, _pycurl=pycurl):
- while 1:
- try:
- ret, _ = self._socket_action(fd, event)
- except pycurl.error as exc:
- ret = exc.args[0]
- if ret != _pycurl.E_CALL_MULTI_PERFORM:
- break
+ self._pop_from_hub()
+ try:
+ while 1:
+ try:
+ ret, _ = self._socket_action(fd, event)
+ except pycurl.error as exc:
+ ret = exc.args[0]
+ if ret != _pycurl.E_CALL_MULTI_PERFORM:
+ break
+ finally:
+ self._push_to_hub()
self._process_pending_requests()
def _process_pending_requests(self):